virtual int decompress(const bufferlist &in, bufferlist &out) = 0;
// this is a bit weird but we need non-const iterator to be in
// alignment with decode methods
- virtual int decompress(bufferlist::iterator &p, bufferlist &out) = 0;
+ virtual int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out) = 0;
static CompressorRef create(CephContext *cct, const string &type);
};
class CEPH_BUFFER_API BufferlistSource : public snappy::Source {
bufferlist::iterator pb;
+ size_t remaining;
public:
- explicit BufferlistSource(bufferlist::iterator _pb): pb(_pb) {}
+ explicit BufferlistSource(bufferlist::iterator _pb, size_t _input_len): pb(_pb), remaining(_input_len) {
+ remaining = MIN(remaining, pb.get_remaining());
+ }
virtual ~BufferlistSource() {}
- virtual size_t Available() const { return pb.get_remaining(); }
+ virtual size_t Available() const { return remaining; }
virtual const char* Peek(size_t* len) {
const char* data = NULL;
*len = 0;
- if(pb.get_remaining()) {
+ size_t avail = Available();
+ if(avail) {
auto ptmp = pb;
- *len = ptmp.get_ptr_and_advance(pb.get_remaining(), &data);
+ *len = ptmp.get_ptr_and_advance(avail, &data);
}
return data;
}
virtual void Skip(size_t n) {
+ assert(n <= remaining);
pb.advance(n);
+ remaining -= n;
}
bufferlist::iterator get_pos() const { return pb; }
};
public:
SnappyCompressor() : Compressor("snappy") {}
int compress(const bufferlist &src, bufferlist &dst) override {
- BufferlistSource source(const_cast<bufferlist&>(src).begin());
+ BufferlistSource source(const_cast<bufferlist&>(src).begin(), src.length());
bufferptr ptr(snappy::MaxCompressedLength(src.length()));
snappy::UncheckedByteArraySink sink(ptr.c_str());
snappy::Compress(&source, &sink);
}
int decompress(const bufferlist &src, bufferlist &dst) override {
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
- return decompress(i, dst);
+ return decompress(i, src.length(), dst);
}
- int decompress(bufferlist::iterator &p, bufferlist &dst) override {
+ int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &dst) override {
size_t res_len = 0;
// Trick, decompress only need first 32bits buffer
bufferlist::const_iterator ptmp = p;
ptmp.copy(4, tmp);
if (!snappy::GetUncompressedLength(tmp.c_str(), tmp.length(), &res_len))
return -1;
- BufferlistSource source(p);
+ BufferlistSource source(p, compressed_len);
bufferptr ptr(res_len);
if (snappy::RawUncompress(&source, ptr.c_str())) {
p = source.get_pos();
return 0;
}
-int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out)
+int CompressionZlib::decompress(bufferlist::iterator &p, size_t compressed_size, bufferlist &out)
{
int ret;
unsigned have;
}
unsigned char c_out[max_len];
+ size_t remaining = MIN( p.get_remaining(), compressed_size);
+ while(remaining) {
- while(!p.end()) {
-
- long unsigned int len = p.get_ptr_and_advance(p.get_remaining(), &c_in);
-
+ long unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
+ remaining -= len;
strm.avail_in = len;
strm.next_in = (unsigned char*)c_in;
int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
{
bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
- return decompress(i, out);
+ return decompress(i, in.length(), out);
}
CompressionZlib() : Compressor("zlib") {}
int compress(const bufferlist &in, bufferlist &out) override;
int decompress(const bufferlist &in, bufferlist &out) override;
- int decompress(bufferlist::iterator &p, bufferlist &out) override;
+ int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out) override;
};
derr << __func__ << " can't load decompressor " << chdr.type << dendl;
r = -EIO;
} else {
- bufferlist t;
- i.copy(chdr.length, t);
- r = compressor->decompress(t, *result);
+ r = compressor->decompress(i, chdr.length, *result);
if (r < 0) {
derr << __func__ << " decompression failed with exit code " << r << dendl;
r = -EIO;
out = in;
return 0;
}
- virtual int decompress(bufferlist::iterator &p, bufferlist &out)
+ virtual int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out)
{
- p.copy(p.get_remaining(), out);
+ p.copy(MIN(p.get_remaining(), compressed_len), out);
return 0;
}
};
bufferlist after;
res = sp.decompress(out, after);
EXPECT_EQ(res, 0);
+
+ after.clear();
+ size_t compressed_len = out.length();
+ out.append_zero(12);
+ auto it = out.begin();
+ res = sp.decompress(it, compressed_len, after);
+ EXPECT_EQ(res, 0);
}
TEST(SnappyCompressor, sharded_input_decompress)
TEST(CompressionZlib, compress_decompress)
{
CompressionZlib sp;
- EXPECT_STREQ(sp.get_type(), "zlib");
+ EXPECT_STREQ(sp.get_type().c_str(), "zlib");
const char* test = "This is test text";
int len = strlen(test);
bufferlist in, out;
bufferlist exp;
exp.append(test);
EXPECT_TRUE(exp.contents_equal(after));
+ after.clear();
+ size_t compressed_len = out.length();
+ out.append_zero(12);
+ auto it = out.begin();
+ res = sp.decompress(it, compressed_len, after);
+ EXPECT_EQ(res, 0);
+ EXPECT_TRUE(exp.contents_equal(after));
}
TEST(CompressionZlib, compress_decompress_chunk)
{
CompressionZlib sp;
- EXPECT_STREQ(sp.get_type(), "zlib");
+ EXPECT_STREQ(sp.get_type().c_str(), "zlib");
const char* test = "This is test text";
buffer::ptr test2 ("1234567890", 10);
int len = strlen(test);