]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: compression filter uses rgw::putobj::Pipe
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:54:15 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_compression.cc
src/rgw/rgw_compression.h
src/test/rgw/test_rgw_compression.cc

index 3ed47492c9f6e0eb424646da567441f17bdcb90f..6e2be182bf00c4acc4d894487dbda94986cddd26 100644 (file)
@@ -8,20 +8,17 @@
 
 //------------RGWPutObj_Compress---------------
 
-int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again)
+int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
 {
-  bufferlist in_bl;
-  if (*again) {
-    return next->handle_data(in_bl, ofs, phandle, pobj, again);
-  }
-  if (bl.length() > 0) {
+  bufferlist out;
+  if (in.length() > 0) {
     // compression stuff
-    if ((ofs > 0 && compressed) ||                                // if previous part was compressed
-        (ofs == 0)) {                                             // or it's the first part
-      ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
-      int cr = compressor->compress(bl, in_bl);
+    if ((logical_offset > 0 && compressed) || // if previous part was compressed
+        (logical_offset == 0)) {              // or it's the first part
+      ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
+      int cr = compressor->compress(in, out);
       if (cr < 0) {
-        if (ofs > 0) {
+        if (logical_offset > 0) {
           lderr(cct) << "Compression failed with exit code " << cr
               << " for next part, compression process failed" << dendl;
           return -EIO;
@@ -29,24 +26,24 @@ int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, r
         compressed = false;
         ldout(cct, 5) << "Compression failed with exit code " << cr
             << " for first part, storing uncompressed" << dendl;
-        in_bl.claim(bl);
+        out.claim(in);
       } else {
         compressed = true;
     
         compression_block newbl;
         size_t bs = blocks.size();
-        newbl.old_ofs = ofs;
+        newbl.old_ofs = logical_offset;
         newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
-        newbl.len = in_bl.length();
+        newbl.len = out.length();
         blocks.push_back(newbl);
       }
     } else {
       compressed = false;
-      in_bl.claim(bl);
+      out.claim(in);
     }
     // end of compression stuff
   }
-  return next->handle_data(in_bl, ofs, phandle, pobj, again);
+  return Pipe::process(std::move(out), logical_offset);
 }
 
 //----------------RGWGetObj_Decompress---------------------
index b95f91954a297a24e281954c2c89c4556e19929c..e0448044ceeeb2a61295f18cb0f9ab971aaa83fe 100644 (file)
@@ -7,6 +7,7 @@
 #include <vector>
 
 #include "compressor/Compressor.h"
+#include "rgw_putobj.h"
 #include "rgw_op.h"
 
 class RGWGetObj_Decompress : public RGWGetObj_Filter
@@ -31,7 +32,7 @@ public:
 
 };
 
-class RGWPutObj_Compress : public RGWPutObj_Filter
+class RGWPutObj_Compress : public rgw::putobj::Pipe
 {
   CephContext* cct;
   bool compressed{false};
@@ -39,10 +40,10 @@ class RGWPutObj_Compress : public RGWPutObj_Filter
   std::vector<compression_block> blocks;
 public:
   RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
-                     RGWPutObjDataProcessor* next)
-    : RGWPutObj_Filter(next), cct(cct_), compressor(compressor) {}
-  ~RGWPutObj_Compress() override{}
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override;
+                     rgw::putobj::DataProcessor *next)
+    : Pipe(next), cct(cct_), compressor(compressor) {}
+
+  int process(bufferlist&& data, uint64_t logical_offset) override;
 
   bool is_compressed() { return compressed; }
   vector<compression_block>& get_compression_blocks() { return blocks; }
index 939789f8ea6bb299660ad1326ad60c58b2695ae9..1718ea1a889d8fdc1145f7ebca1956e289652e6a 100644 (file)
@@ -48,20 +48,13 @@ public:
   }
 };
 
-class ut_put_sink: public RGWPutObjDataProcessor
+class ut_put_sink: public rgw::putobj::DataProcessor
 {
   bufferlist sink;
 public:
-  ut_put_sink(){}
-  virtual ~ut_put_sink(){}
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override
-  {
-    sink.append(bl);
-    *again = false;
-    return 0;
-  }
-  int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override
+  int process(bufferlist&& bl, uint64_t ofs) override
   {
+    sink.claim_append(bl);
     return 0;
   }
   bufferlist&  get_sink()
@@ -128,16 +121,10 @@ TEST(Compress, LimitedChunkSize)
     bufferlist bl;
     bl.append(bp);
 
-    void* handle;
-    rgw_raw_obj obj;
-    bool again = false;
-
     ut_put_sink c_sink;
     RGWPutObj_Compress compressor(g_ceph_context, plugin, &c_sink);
-    compressor.handle_data(bl, 0, &handle, &obj, &again);
-
-    bufferlist empty;
-    compressor.handle_data(empty, s, &handle, &obj, &again);
+    compressor.process(std::move(bl), 0);
+    compressor.process({}, s); // flush
 
     RGWCompressionInfo cs_info;
     cs_info.compression_type = plugin->get_type_name();
@@ -152,6 +139,7 @@ TEST(Compress, LimitedChunkSize)
     decompress.fixup_range(f_begin, f_end);
 
     decompress.handle_data(c_sink.get_sink(), 0, c_sink.get_sink().length());
+    bufferlist empty;
     decompress.handle_data(empty, 0, 0);
 
     ASSERT_LE(d_sink.get_size(), (size_t)g_ceph_context->_conf->rgw_max_chunk_size);
@@ -172,15 +160,9 @@ TEST(Compress, BillionZeros)
   bufferlist bl;
   bl.append(bp);
 
-  void* handle;
-  rgw_raw_obj obj;
-  bool again = false;
-
   for (int i=0; i<1000;i++)
-    compressor.handle_data(bl, size*i, &handle, &obj, &again);
-
-  bufferlist empty;
-  compressor.handle_data(empty, size*1000, &handle, &obj, &again);
+    compressor.process(bufferlist{bl}, size*i);
+  compressor.process({}, size*1000); // flush
 
   RGWCompressionInfo cs_info;
   cs_info.compression_type = plugin->get_type_name();
@@ -195,6 +177,7 @@ TEST(Compress, BillionZeros)
   decompress.fixup_range(f_begin, f_end);
 
   decompress.handle_data(c_sink.get_sink(), 0, c_sink.get_sink().length());
+  bufferlist empty;
   decompress.handle_data(empty, 0, 0);
 
   ASSERT_EQ(d_sink.get_sink().length() , size*1000);