]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move compressor plugin out of filter
authorCasey Bodley <cbodley@redhat.com>
Fri, 11 Nov 2016 14:53:34 +0000 (09:53 -0500)
committerCasey Bodley <cbodley@redhat.com>
Fri, 11 Nov 2016 19:06:47 +0000 (14:06 -0500)
if rgw_compression_type is set to "random", we need to use the same
plugin for each block of data

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_compression.cc
src/rgw/rgw_compression.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc

index ee4d460369069510f788e3a790911c8c85afe71b..3a193ce7f959ac44ee5665568dbb5d86f7fa79cb 100644 (file)
@@ -19,39 +19,26 @@ int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, r
     if ((ofs > 0 && compressed) ||                                // if previous part was compressed
         (ofs == 0)) {                                             // or it's the first part
       ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
-      CompressorRef compressor = Compressor::create(cct, cct->_conf->rgw_compression_type);
-      if (!compressor.get()) {
-        if (ofs > 0 && compressed) {
-          lderr(cct) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type
-                              << " for next part, compression process failed" << dendl;
+      int cr = compressor->compress(bl, in_bl);
+      if (cr < 0) {
+        if (ofs > 0) {
+          lderr(cct) << "Compression failed with exit code " << cr
+              << " for next part, compression process failed" << dendl;
           return -EIO;
         }
-        // if compressor isn't available - just do not use it with log warning?
-        ldout(cct, 5) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type 
-                      << " for rgw, check rgw_compression_type config option" << dendl;
         compressed = false;
+        ldout(cct, 5) << "Compression failed with exit code " << cr
+            << " for first part, storing uncompressed" << dendl;
         in_bl.claim(bl);
       } else {
-        int cr = compressor->compress(bl, in_bl);
-        if (cr < 0) {
-          if (ofs > 0 && compressed) {
-            lderr(cct) << "Compression failed with exit code " << cr
-                       << " for next part, compression process failed" << dendl;
-            return -EIO;
-          }
-          ldout(cct, 5) << "Compression failed with exit code " << cr << dendl;
-          compressed = false;
-          in_bl.claim(bl);
-        } else {
-          compressed = true;
+        compressed = true;
     
-          compression_block newbl;
-          int bs = blocks.size();
-          newbl.old_ofs = ofs;
-          newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
-          newbl.len = in_bl.length();
-          blocks.push_back(newbl);
-        }
+        compression_block newbl;
+        int bs = blocks.size();
+        newbl.old_ofs = ofs;
+        newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
+        newbl.len = in_bl.length();
+        blocks.push_back(newbl);
       }
     } else {
       compressed = false;
index 012ba3202869dca45876a62e694ad7d02434bcb6..a649afdfe111f421fd0e1b4e165100c272c99734 100644 (file)
@@ -35,12 +35,13 @@ public:
 class RGWPutObj_Compress : public RGWPutObj_Filter
 {
   CephContext* cct;
-  bool compressed;
+  bool compressed{false};
+  CompressorRef compressor;
   std::vector<compression_block> blocks;
 public:
-  RGWPutObj_Compress(CephContext* cct_, RGWPutObjDataProcessor* next) :  RGWPutObj_Filter(next),
-                                                                         cct(cct_),
-                                                                         compressed(false) {}
+  RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
+                     RGWPutObjDataProcessor* next)
+    : RGWPutObj_Filter(next), cct(cct_), compressor(compressor) {}
   virtual ~RGWPutObj_Compress(){}
   virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override;
 
index 5d7fc74524e6f78d6e8d64cd592fe4315c57d937..4f35e9cddce5a9d85ce8005ff057cf26c1f85484 100644 (file)
@@ -2824,7 +2824,8 @@ void RGWPutObj::execute()
   
   off_t fst;
   off_t lst;
-  bool compression_enabled;
+  const auto& compression_type = s->cct->_conf->rgw_compression_type;
+  CompressorRef plugin;
   boost::optional<RGWPutObj_Compress> compressor;
 
   bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
@@ -2910,10 +2911,15 @@ void RGWPutObj::execute()
 
   fst = copy_source_range_fst;
   lst = copy_source_range_lst;
-  compression_enabled = s->cct->_conf->rgw_compression_type != "none";
-  if (compression_enabled) {
-    compressor = boost::in_place(s->cct, filter);
-    filter = &*compressor;
+  if (compression_type != "none") {
+    plugin = Compressor::create(s->cct, compression_type);
+    if (!plugin) {
+      ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
+          << compression_type << dendl;
+    } else {
+      compressor = boost::in_place(s->cct, plugin, filter);
+      filter = &*compressor;
+    }
   }
 
   do {
@@ -2993,8 +2999,8 @@ void RGWPutObj::execute()
         goto done;
       }
 
-      if (compression_enabled) {
-        compressor = boost::in_place(s->cct, filter);
+      if (compressor) {
+        compressor = boost::in_place(s->cct, plugin, filter);
         filter = &*compressor;
       }
 
@@ -3050,10 +3056,10 @@ void RGWPutObj::execute()
 
   hash.Final(m);
 
-  if (compression_enabled && compressor->is_compressed()) {
+  if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
-    cs_info.compression_type = s->cct->_conf->rgw_compression_type;
+    cs_info.compression_type = plugin->get_type_name();
     cs_info.orig_size = s->obj_size;
     cs_info.blocks = move(compressor->get_compression_blocks());
     ::encode(cs_info, tmp);
@@ -3168,7 +3174,6 @@ void RGWPostObj::execute()
   MD5 hash;
   buffer::list bl, aclbl;
   int len = 0;
-  bool compression_enabled;
   boost::optional<RGWPutObj_Compress> compressor;
 
   // read in the data from the POST form
@@ -3209,10 +3214,17 @@ void RGWPostObj::execute()
   if (op_ret < 0)
     return;
 
-  compression_enabled = s->cct->_conf->rgw_compression_type != "none";
-  if (compression_enabled) {
-    compressor = boost::in_place(s->cct, filter);
-    filter = &*compressor;
+  const auto& compression_type = s->cct->_conf->rgw_compression_type;
+  CompressorRef plugin;
+  if (compression_type != "none") {
+    plugin = Compressor::create(s->cct, compression_type);
+    if (!plugin) {
+      ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
+          << compression_type << dendl;
+    } else {
+      compressor = boost::in_place(s->cct, plugin, filter);
+      filter = &*compressor;
+    }
   }
 
   while (data_pending) {
@@ -3267,10 +3279,10 @@ void RGWPostObj::execute()
     emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
   }
 
-  if (compression_enabled && compressor->is_compressed()) {
+  if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
-    cs_info.compression_type = s->cct->_conf->rgw_compression_type;
+    cs_info.compression_type = plugin->get_type_name();
     cs_info.orig_size = s->obj_size;
     cs_info.blocks = move(compressor->get_compression_blocks());
     ::encode(cs_info, tmp);
index 352c87e7b7fc2cde36feb508b4fad6e044203985..fd80c2afe80e8215e87fdff0bb1cc17ae926ae2b 100644 (file)
@@ -7070,12 +7070,20 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   }
 
   boost::optional<RGWPutObj_Compress> compressor;
+  CompressorRef plugin;
 
   RGWPutObjDataProcessor *filter = &processor;
-  bool compression_enabled = cct->_conf->rgw_compression_type != "none";
-  if (compression_enabled) {
-    compressor = boost::in_place(cct, filter);
-    filter = &*compressor;
+
+  const auto& compression_type = cct->_conf->rgw_compression_type;
+  if (compression_type != "none") {
+    plugin = Compressor::create(cct, compression_type);
+    if (!plugin) {
+      ldout(cct, 1) << "Cannot load plugin for rgw_compression_type "
+          << compression_type << dendl;
+    } else {
+      compressor = boost::in_place(cct, plugin, filter);
+      filter = &*compressor;
+    }
   }
 
   RGWRadosPutObj cb(cct, filter, &processor, opstate, progress_cb, progress_data);
@@ -7142,10 +7150,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
        }
       }
     }
-    if (compression_enabled && compressor->is_compressed()) {
+    if (compressor && compressor->is_compressed()) {
       bufferlist tmp;
       RGWCompressionInfo cs_info;
-      cs_info.compression_type = cct->_conf->rgw_compression_type;
+      cs_info.compression_type = plugin->get_type_name();
       cs_info.orig_size = cb.get_data_len();
       cs_info.blocks = move(compressor->get_compression_blocks());
       ::encode(cs_info, tmp);