From: Igor Fedotov Date: Tue, 24 May 2016 17:05:14 +0000 (+0300) Subject: compressor: Extends decompressor interface to be able to provide compressed data... X-Git-Tag: v11.0.0~359^2~24 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=183db05a35cc3bd84b45f12a05780f5157659895;p=ceph.git compressor: Extends decompressor interface to be able to provide compressed data length. Signed-off-by: Igor Fedotov --- diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h index f75c651c40e8..e1aa3c2d1e6e 100644 --- a/src/compressor/Compressor.h +++ b/src/compressor/Compressor.h @@ -33,7 +33,7 @@ class Compressor { virtual int decompress(const bufferlist &in, bufferlist &out) = 0; // this is a bit weird but we need non-const iterator to be in // alignment with decode methods - virtual int decompress(bufferlist::iterator &p, bufferlist &out) = 0; + virtual int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out) = 0; static CompressorRef create(CephContext *cct, const string &type); }; diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h index 2fc748ae7fbb..677d7a329024 100644 --- a/src/compressor/snappy/SnappyCompressor.h +++ b/src/compressor/snappy/SnappyCompressor.h @@ -23,22 +23,28 @@ class CEPH_BUFFER_API BufferlistSource : public snappy::Source { bufferlist::iterator pb; + size_t remaining; public: - explicit BufferlistSource(bufferlist::iterator _pb): pb(_pb) {} + explicit BufferlistSource(bufferlist::iterator _pb, size_t _input_len): pb(_pb), remaining(_input_len) { + remaining = MIN(remaining, pb.get_remaining()); + } virtual ~BufferlistSource() {} - virtual size_t Available() const { return pb.get_remaining(); } + virtual size_t Available() const { return remaining; } virtual const char* Peek(size_t* len) { const char* data = NULL; *len = 0; - if(pb.get_remaining()) { + size_t avail = Available(); + if(avail) { auto ptmp = pb; - *len = ptmp.get_ptr_and_advance(pb.get_remaining(), &data); + *len = ptmp.get_ptr_and_advance(avail, &data); } return data; } virtual void Skip(size_t n) { + assert(n <= remaining); pb.advance(n); + remaining -= n; } bufferlist::iterator get_pos() const { return pb; } }; @@ -47,7 +53,7 @@ class SnappyCompressor : public Compressor { public: SnappyCompressor() : Compressor("snappy") {} int compress(const bufferlist &src, bufferlist &dst) override { - BufferlistSource source(const_cast(src).begin()); + BufferlistSource source(const_cast(src).begin(), src.length()); bufferptr ptr(snappy::MaxCompressedLength(src.length())); snappy::UncheckedByteArraySink sink(ptr.c_str()); snappy::Compress(&source, &sink); @@ -56,9 +62,9 @@ class SnappyCompressor : public Compressor { } int decompress(const bufferlist &src, bufferlist &dst) override { bufferlist::iterator i = const_cast(src).begin(); - return decompress(i, dst); + return decompress(i, src.length(), dst); } - int decompress(bufferlist::iterator &p, bufferlist &dst) override { + int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &dst) override { size_t res_len = 0; // Trick, decompress only need first 32bits buffer bufferlist::const_iterator ptmp = p; @@ -66,7 +72,7 @@ class SnappyCompressor : public Compressor { ptmp.copy(4, tmp); if (!snappy::GetUncompressedLength(tmp.c_str(), tmp.length(), &res_len)) return -1; - BufferlistSource source(p); + BufferlistSource source(p, compressed_len); bufferptr ptr(res_len); if (snappy::RawUncompress(&source, ptr.c_str())) { p = source.get_pos(); diff --git a/src/compressor/zlib/CompressionZlib.cc b/src/compressor/zlib/CompressionZlib.cc index d8a28fa064b0..27aba8cf464d 100644 --- a/src/compressor/zlib/CompressionZlib.cc +++ b/src/compressor/zlib/CompressionZlib.cc @@ -94,7 +94,7 @@ int CompressionZlib::compress(const bufferlist &in, bufferlist &out) return 0; } -int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out) +int CompressionZlib::decompress(bufferlist::iterator &p, size_t compressed_size, bufferlist &out) { int ret; unsigned have; @@ -115,11 +115,11 @@ int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out) } unsigned char c_out[max_len]; + size_t remaining = MIN( p.get_remaining(), compressed_size); + while(remaining) { - while(!p.end()) { - - long unsigned int len = p.get_ptr_and_advance(p.get_remaining(), &c_in); - + long unsigned int len = p.get_ptr_and_advance(remaining, &c_in); + remaining -= len; strm.avail_in = len; strm.next_in = (unsigned char*)c_in; @@ -147,5 +147,5 @@ int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out) int CompressionZlib::decompress(const bufferlist &in, bufferlist &out) { bufferlist::iterator i = const_cast(in).begin(); - return decompress(i, out); + return decompress(i, in.length(), out); } diff --git a/src/compressor/zlib/CompressionZlib.h b/src/compressor/zlib/CompressionZlib.h index 21acb65ac1ef..133f91232fb1 100644 --- a/src/compressor/zlib/CompressionZlib.h +++ b/src/compressor/zlib/CompressionZlib.h @@ -26,7 +26,7 @@ public: CompressionZlib() : Compressor("zlib") {} int compress(const bufferlist &in, bufferlist &out) override; int decompress(const bufferlist &in, bufferlist &out) override; - int decompress(bufferlist::iterator &p, bufferlist &out) override; + int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out) override; }; diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 04b499736c85..2526ae3d5869 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -3186,9 +3186,7 @@ int BlueStore::_decompress(bufferlist& source, bufferlist* result) derr << __func__ << " can't load decompressor " << chdr.type << dendl; r = -EIO; } else { - bufferlist t; - i.copy(chdr.length, t); - r = compressor->decompress(t, *result); + r = compressor->decompress(i, chdr.length, *result); if (r < 0) { derr << __func__ << " decompression failed with exit code " << r << dendl; r = -EIO; diff --git a/src/test/compressor/compressor_example.h b/src/test/compressor/compressor_example.h index f02aafb18263..40ab660878c4 100644 --- a/src/test/compressor/compressor_example.h +++ b/src/test/compressor/compressor_example.h @@ -43,9 +43,9 @@ public: out = in; return 0; } - virtual int decompress(bufferlist::iterator &p, bufferlist &out) + virtual int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out) { - p.copy(p.get_remaining(), out); + p.copy(MIN(p.get_remaining(), compressed_len), out); return 0; } }; diff --git a/src/test/compressor/test_compression_snappy.cc b/src/test/compressor/test_compression_snappy.cc index e8bfe1bba76e..296de4103b2c 100644 --- a/src/test/compressor/test_compression_snappy.cc +++ b/src/test/compressor/test_compression_snappy.cc @@ -36,6 +36,13 @@ TEST(SnappyCompressor, compress_decompress) bufferlist after; res = sp.decompress(out, after); EXPECT_EQ(res, 0); + + after.clear(); + size_t compressed_len = out.length(); + out.append_zero(12); + auto it = out.begin(); + res = sp.decompress(it, compressed_len, after); + EXPECT_EQ(res, 0); } TEST(SnappyCompressor, sharded_input_decompress) diff --git a/src/test/compressor/test_compression_zlib.cc b/src/test/compressor/test_compression_zlib.cc index e3706018ce30..642264b3380e 100644 --- a/src/test/compressor/test_compression_zlib.cc +++ b/src/test/compressor/test_compression_zlib.cc @@ -27,7 +27,7 @@ TEST(CompressionZlib, compress_decompress) { CompressionZlib sp; - EXPECT_STREQ(sp.get_type(), "zlib"); + EXPECT_STREQ(sp.get_type().c_str(), "zlib"); const char* test = "This is test text"; int len = strlen(test); bufferlist in, out; @@ -40,12 +40,19 @@ TEST(CompressionZlib, compress_decompress) bufferlist exp; exp.append(test); EXPECT_TRUE(exp.contents_equal(after)); + after.clear(); + size_t compressed_len = out.length(); + out.append_zero(12); + auto it = out.begin(); + res = sp.decompress(it, compressed_len, after); + EXPECT_EQ(res, 0); + EXPECT_TRUE(exp.contents_equal(after)); } TEST(CompressionZlib, compress_decompress_chunk) { CompressionZlib sp; - EXPECT_STREQ(sp.get_type(), "zlib"); + EXPECT_STREQ(sp.get_type().c_str(), "zlib"); const char* test = "This is test text"; buffer::ptr test2 ("1234567890", 10); int len = strlen(test);