From d6b28cdb3955fe6260b0042de0133f449e636404 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 10 Oct 2018 15:54:15 -0400 Subject: [PATCH] rgw: compression filter uses rgw::putobj::Pipe Signed-off-by: Casey Bodley --- src/rgw/rgw_compression.cc | 29 +++++++++++------------ src/rgw/rgw_compression.h | 11 +++++---- src/test/rgw/test_rgw_compression.cc | 35 +++++++--------------------- 3 files changed, 28 insertions(+), 47 deletions(-) diff --git a/src/rgw/rgw_compression.cc b/src/rgw/rgw_compression.cc index 3ed47492c9f6e..6e2be182bf00c 100644 --- a/src/rgw/rgw_compression.cc +++ b/src/rgw/rgw_compression.cc @@ -8,20 +8,17 @@ //------------RGWPutObj_Compress--------------- -int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) +int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset) { - bufferlist in_bl; - if (*again) { - return next->handle_data(in_bl, ofs, phandle, pobj, again); - } - if (bl.length() > 0) { + bufferlist out; + if (in.length() > 0) { // compression stuff - if ((ofs > 0 && compressed) || // if previous part was compressed - (ofs == 0)) { // or it's the first part - ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl; - int cr = compressor->compress(bl, in_bl); + if ((logical_offset > 0 && compressed) || // if previous part was compressed + (logical_offset == 0)) { // or it's the first part + ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl; + int cr = compressor->compress(in, out); if (cr < 0) { - if (ofs > 0) { + if (logical_offset > 0) { lderr(cct) << "Compression failed with exit code " << cr << " for next part, compression process failed" << dendl; return -EIO; @@ -29,24 +26,24 @@ int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, r compressed = false; ldout(cct, 5) << "Compression failed with exit code " << cr << " for first part, storing uncompressed" << dendl; - in_bl.claim(bl); + out.claim(in); } else { compressed = true; compression_block newbl; size_t bs = blocks.size(); - newbl.old_ofs = ofs; + newbl.old_ofs = logical_offset; newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; - newbl.len = in_bl.length(); + newbl.len = out.length(); blocks.push_back(newbl); } } else { compressed = false; - in_bl.claim(bl); + out.claim(in); } // end of compression stuff } - return next->handle_data(in_bl, ofs, phandle, pobj, again); + return Pipe::process(std::move(out), logical_offset); } //----------------RGWGetObj_Decompress--------------------- diff --git a/src/rgw/rgw_compression.h b/src/rgw/rgw_compression.h index b95f91954a297..e0448044ceeeb 100644 --- a/src/rgw/rgw_compression.h +++ b/src/rgw/rgw_compression.h @@ -7,6 +7,7 @@ #include #include "compressor/Compressor.h" +#include "rgw_putobj.h" #include "rgw_op.h" class RGWGetObj_Decompress : public RGWGetObj_Filter @@ -31,7 +32,7 @@ public: }; -class RGWPutObj_Compress : public RGWPutObj_Filter +class RGWPutObj_Compress : public rgw::putobj::Pipe { CephContext* cct; bool compressed{false}; @@ -39,10 +40,10 @@ class RGWPutObj_Compress : public RGWPutObj_Filter std::vector blocks; public: RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor, - RGWPutObjDataProcessor* next) - : RGWPutObj_Filter(next), cct(cct_), compressor(compressor) {} - ~RGWPutObj_Compress() override{} - int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override; + rgw::putobj::DataProcessor *next) + : Pipe(next), cct(cct_), compressor(compressor) {} + + int process(bufferlist&& data, uint64_t logical_offset) override; bool is_compressed() { return compressed; } vector& get_compression_blocks() { return blocks; } diff --git a/src/test/rgw/test_rgw_compression.cc b/src/test/rgw/test_rgw_compression.cc index 939789f8ea6bb..1718ea1a889d8 100644 --- a/src/test/rgw/test_rgw_compression.cc +++ b/src/test/rgw/test_rgw_compression.cc @@ -48,20 +48,13 @@ public: } }; -class ut_put_sink: public RGWPutObjDataProcessor +class ut_put_sink: public rgw::putobj::DataProcessor { bufferlist sink; public: - ut_put_sink(){} - virtual ~ut_put_sink(){} - int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override - { - sink.append(bl); - *again = false; - return 0; - } - int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override + int process(bufferlist&& bl, uint64_t ofs) override { + sink.claim_append(bl); return 0; } bufferlist& get_sink() @@ -128,16 +121,10 @@ TEST(Compress, LimitedChunkSize) bufferlist bl; bl.append(bp); - void* handle; - rgw_raw_obj obj; - bool again = false; - ut_put_sink c_sink; RGWPutObj_Compress compressor(g_ceph_context, plugin, &c_sink); - compressor.handle_data(bl, 0, &handle, &obj, &again); - - bufferlist empty; - compressor.handle_data(empty, s, &handle, &obj, &again); + compressor.process(std::move(bl), 0); + compressor.process({}, s); // flush RGWCompressionInfo cs_info; cs_info.compression_type = plugin->get_type_name(); @@ -152,6 +139,7 @@ TEST(Compress, LimitedChunkSize) decompress.fixup_range(f_begin, f_end); decompress.handle_data(c_sink.get_sink(), 0, c_sink.get_sink().length()); + bufferlist empty; decompress.handle_data(empty, 0, 0); ASSERT_LE(d_sink.get_size(), (size_t)g_ceph_context->_conf->rgw_max_chunk_size); @@ -172,15 +160,9 @@ TEST(Compress, BillionZeros) bufferlist bl; bl.append(bp); - void* handle; - rgw_raw_obj obj; - bool again = false; - for (int i=0; i<1000;i++) - compressor.handle_data(bl, size*i, &handle, &obj, &again); - - bufferlist empty; - compressor.handle_data(empty, size*1000, &handle, &obj, &again); + compressor.process(bufferlist{bl}, size*i); + compressor.process({}, size*1000); // flush RGWCompressionInfo cs_info; cs_info.compression_type = plugin->get_type_name(); @@ -195,6 +177,7 @@ TEST(Compress, BillionZeros) decompress.fixup_range(f_begin, f_end); decompress.handle_data(c_sink.get_sink(), 0, c_sink.get_sink().length()); + bufferlist empty; decompress.handle_data(empty, 0, 0); ASSERT_EQ(d_sink.get_sink().length() , size*1000); -- 2.39.5