]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move compression code in filters system
authorVed-vampir <akiselyova@mirantis.com>
Thu, 30 Jun 2016 12:55:16 +0000 (15:55 +0300)
committerAdam Kupczyk <akupczyk@mirantis.com>
Wed, 2 Nov 2016 10:35:22 +0000 (11:35 +0100)
Signed-off-by: Alyona Kiseleva <akiselyova@mirantis.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_compression.cc [new file with mode: 0644]
src/rgw/rgw_compression.h [new file with mode: 0644]
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_swift.cc
src/spdk
src/vstart.sh
src/xxHash

index 20984a0adc9b465b115d87798bbb00ad3f164398..bc70a83dffb53be84afd22c278ec28e16026ad2f 100644 (file)
@@ -29,6 +29,7 @@ set(rgw_a_srcs
   rgw_cache.cc
   rgw_client_io.cc
   rgw_common.cc
+  rgw_compression.cc
   rgw_cors.cc
   rgw_cors_s3.cc
   rgw_dencoder.cc
diff --git a/src/rgw/rgw_compression.cc b/src/rgw/rgw_compression.cc
new file mode 100644 (file)
index 0000000..b58c0cd
--- /dev/null
@@ -0,0 +1,173 @@
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_compression.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+//------------RGWPutObj_Compress---------------
+
+int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again)
+{
+  bufferlist in_bl;
+
+  if (bl.length() > 0) {
+    // compression stuff
+    if ((ofs > 0 && compressed) ||                                // if previous part was compressed
+        (ofs == 0)) {                                             // or it's the first part
+      ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
+      CompressorRef compressor = Compressor::create(cct, cct->_conf->rgw_compression_type);
+      if (!compressor.get()) {
+        if (ofs > 0 && compressed) {
+          lderr(cct) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type
+                              << " for next part, compression process failed" << dendl;
+          return -EIO;
+        }
+        // if compressor isn't available - just do not use it with log warning?
+        ldout(cct, 5) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type 
+                      << " for rgw, check rgw_compression_type config option" << dendl;
+        compressed = false;
+        in_bl.claim(bl);
+      } else {
+        int cr = compressor->compress(bl, in_bl);
+        if (cr < 0) {
+          if (ofs > 0 && compressed) {
+            lderr(cct) << "Compression failed with exit code " << cr
+                       << " for next part, compression process failed" << dendl;
+            return -EIO;
+          }
+          ldout(cct, 5) << "Compression failed with exit code " << cr << dendl;
+          compressed = false;
+          in_bl.claim(bl);
+        } else {
+          compressed = true;
+    
+          compression_block newbl;
+          int bs = blocks.size();
+          newbl.old_ofs = ofs;
+          newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
+          newbl.len = in_bl.length();
+          blocks.push_back(newbl);
+        }
+      }
+    } else {
+      compressed = false;
+      in_bl.claim(bl);
+    }
+    // end of compression stuff
+  }
+
+  return next->handle_data(in_bl, ofs, phandle, pobj, again);
+}
+
+//----------------RGWGetObj_Decompress---------------------
+RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, 
+                                           RGWCompressionInfo* cs_info_, 
+                                           bool partial_content_,
+                                           RGWGetDataCB* next): RGWGetObj_Filter(next),
+                                                                cct(cct_),
+                                                                cs_info(cs_info_),
+                                                                partial_content(partial_content_),
+                                                                q_ofs(0),
+                                                                q_len(0),
+                                                                first_data(true),
+                                                                cur_ofs(0)
+{
+  compressor = Compressor::create(cct, cs_info->compression_type);
+  if (!compressor.get())
+    lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type 
+                     << " for rgw, check rgw_compression_type config option" << dendl;
+}
+
+int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+  ldout(cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl;
+
+  if (!compressor.get()) {
+    // if compressor isn't available - error, because cannot return decompressed data?
+    lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type 
+                     << " for rgw, check rgw_compression_type config option" << dendl;
+    return -EIO;
+  }
+  bufferlist out_bl, in_bl;
+  bl_ofs = 0;
+  if (waiting.length() != 0) {
+    in_bl.append(waiting);
+    in_bl.append(bl);        
+    waiting.clear();
+  } else {
+    in_bl.claim(bl);
+  }
+  bl_len = in_bl.length();
+  
+  while (first_block <= last_block) {
+    bufferlist tmp, tmp_out;
+    int ofs_in_bl = first_block->new_ofs - cur_ofs;
+    if (ofs_in_bl + (unsigned)first_block->len > bl_len) {
+      // not complete block, put it to waiting
+      int tail = bl_len - ofs_in_bl;
+      in_bl.copy(ofs_in_bl, tail, waiting);
+      cur_ofs -= tail;
+      break;
+    }
+    in_bl.copy(ofs_in_bl, first_block->len, tmp);
+    int cr = compressor->decompress(tmp, tmp_out);
+    if (cr < 0) {
+      lderr(cct) << "Compression failed with exit code " << cr << dendl;
+      return cr;
+    }
+    if (first_block == last_block && partial_content)
+      tmp_out.copy(0, q_len, out_bl);
+    else
+      out_bl.append(tmp_out);
+    first_block++;
+  }
+
+  if (first_data && partial_content && out_bl.length() != 0)
+    bl_ofs =  q_ofs;
+
+  if (first_data && out_bl.length() != 0)
+    first_data = false;
+
+  cur_ofs += bl_len;
+  return next->handle_data(out_bl, bl_ofs, out_bl.length() - bl_ofs);
+}
+
+void RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
+{
+  if (partial_content) {
+    // if user set range, we need to calculate it in decompressed data
+    first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
+    if (cs_info->blocks.size() > 1) {
+      vector<compression_block>::iterator fb, lb;
+      // not bad to use auto for lambda, I think
+      auto cmp_u = [] (off_t ofs, const compression_block& e) { return (unsigned)ofs < e.old_ofs; };
+      auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs < (unsigned)ofs; };
+      fb = upper_bound(cs_info->blocks.begin()+1,
+                       cs_info->blocks.end(),
+                       ofs,
+                       cmp_u);
+      first_block = fb - 1;
+      lb = lower_bound(fb,
+                       cs_info->blocks.end(),
+                       end,
+                       cmp_l);
+      last_block = lb - 1;
+    }
+  } else {
+    first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
+  }
+
+  q_ofs = ofs - first_block->old_ofs;
+  q_len = end - last_block->old_ofs + 1;
+
+  ofs = first_block->new_ofs;
+  end = last_block->new_ofs + last_block->len;
+
+  first_data = true;
+  cur_ofs = ofs;
+  waiting.clear();
+
+  next->fixup_range(ofs, end);
+}
diff --git a/src/rgw/rgw_compression.h b/src/rgw/rgw_compression.h
new file mode 100644 (file)
index 0000000..41e4bf8
--- /dev/null
@@ -0,0 +1,51 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RGW_COMPRESSION_H
+#define CEPH_RGW_COMPRESSION_H
+
+#include <vector>
+
+#include "compressor/Compressor.h"
+#include "rgw_op.h"
+
+class RGWGetObj_Decompress : public RGWGetObj_Filter
+{
+  CephContext* cct;
+  CompressorRef compressor;
+  RGWCompressionInfo* cs_info;
+  bool partial_content;
+  vector<compression_block>::iterator first_block, last_block;
+  off_t q_ofs, q_len;
+  bool first_data;
+  uint64_t cur_ofs;
+  bufferlist waiting;
+public:
+  RGWGetObj_Decompress(CephContext* cct_, 
+                       RGWCompressionInfo* cs_info_, 
+                       bool partial_content_,
+                       RGWGetDataCB* next);
+  virtual ~RGWGetObj_Decompress() {}
+
+  virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
+  virtual void fixup_range(off_t& ofs, off_t& end) override;
+
+};
+
+class RGWPutObj_Compress : public RGWPutObj_Filter
+{
+  CephContext* cct;
+  bool compressed;
+  std::vector<compression_block> blocks;
+public:
+  RGWPutObj_Compress(CephContext* cct_, RGWPutObjDataProcessor* next) :  RGWPutObj_Filter(next), 
+                                                                         cct(cct_) {}
+  virtual ~RGWPutObj_Compress(){}
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override;
+
+  bool is_compressed() { return compressed; }
+  vector<compression_block>& get_compression_blocks() { return blocks; }
+
+}; /* RGWPutObj_Compress */
+
+#endif /* CEPH_RGW_COMPRESSION_H */
index da434a95c2522f1e16d578ea9a5d56049b1617a2..70b3626d63b98c8a040f5c5f76e6143ec36effca 100644 (file)
@@ -8,6 +8,8 @@
 #include <sstream>
 
 #include <boost/algorithm/string/predicate.hpp>
+#include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
 
 #include "common/Clock.h"
 #include "common/armor.h"
@@ -30,9 +32,8 @@
 #include "rgw_cors_s3.h"
 #include "rgw_rest_conn.h"
 #include "rgw_rest_s3.h"
-#include "rgw_lc.h"
-#include "rgw_lc_s3.h"
 #include "rgw_client_io.h"
+#include "rgw_compression.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/rgw/cls_rgw_client.h"
 
@@ -774,6 +775,16 @@ bool RGWOp::generate_cors_headers(string& origin, string& method, string& header
   return true;
 }
 
+int RGWOp::update_compressed_bucket_size(uint64_t obj_size, RGWBucketCompressionInfo& bucket_size)
+{
+  bucket_size.orig_size += obj_size;
+  bufferlist bs;
+  ::encode(bucket_size, bs);
+  s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
+  return store->put_bucket_instance_info(s->bucket_info, false, real_time(),
+                                         &s->bucket_attrs);
+}
+
 int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
                                        const RGWObjEnt& ent,
                                        RGWAccessControlPolicy * const bucket_policy,
@@ -1265,62 +1276,6 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
     gc_invalidate_time = start_time;
     gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
   }
-  // compression stuff
-  if (need_decompress) {
-    ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl;
-
-    CompressorRef compressor = Compressor::create(s->cct, cs_info.compression_type);
-    if (!compressor.get()) {
-      // if compressor isn't available - error, because cannot return decompressed data?
-      lderr(s->cct) << "Cannot load compressor of type " << cs_info.compression_type 
-                       << "for rgw, check rgw_compression_type config option" << dendl;
-      return -EIO;
-    } else {
-      bufferlist out_bl, in_bl;
-      bl_ofs = 0;
-      if (waiting.length() != 0) {
-        in_bl.append(waiting);
-        in_bl.append(bl);        
-        waiting.clear();
-      } else {
-        in_bl.claim(bl);
-      }
-      bl_len = in_bl.length();
-      
-      while (first_block <= last_block) {
-        bufferlist tmp, tmp_out;
-        int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs;
-        if (ofs_in_bl + cs_info.blocks[first_block].len > bl_len) {
-          // not complete block, put it to waiting
-          int tail = bl_len - ofs_in_bl;
-          in_bl.copy(ofs_in_bl, tail, waiting);
-          cur_ofs -= tail;
-          break;
-        }
-        in_bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
-        int cr = compressor->decompress(tmp, tmp_out);
-        if (cr < 0) {
-          lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
-          return cr;
-        }
-        if (first_block == last_block && partial_content)
-          out_bl.append(tmp_out.c_str(), q_len);
-        else
-          out_bl.append(tmp_out);
-        first_block++;
-      }
-
-      if (first_data && partial_content && out_bl.length() != 0)
-        bl_ofs =  q_ofs;
-
-      if (first_data && out_bl.length() != 0)
-        first_data = false;
-
-      cur_ofs += bl_len;
-      return send_response_data(out_bl, bl_ofs, out_bl.length() - bl_ofs);
-    }
-  }
-  // end of compression stuff
   return send_response_data(bl, bl_ofs, bl_len);
 }
 
@@ -1383,7 +1338,8 @@ void RGWGetObj::execute()
   gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
 
   RGWGetObj_CB cb(this);
-  RGWGetDataCB* decrypt = nullptr;
+  RGWGetDataCB* filter = (RGWGetDataCB*)&cb;
+  boost::optional<RGWGetObj_Decompress> decompress;
   map<string, bufferlist>::iterator attr_iter;
 
   perfcounter->inc(l_rgw_get);
@@ -1447,7 +1403,7 @@ void RGWGetObj::execute()
         end = cs_info.orig_size - 1;
       }
 
-      if (ofs >= cs_info.orig_size) {
+      if ((unsigned)ofs >= cs_info.orig_size) {
         lderr(s->cct) << "ERROR: begin of the bytes range more than object size (" << cs_info.orig_size
                          << ")" <<  dendl;
         op_ret = -ERANGE;
@@ -1455,7 +1411,7 @@ void RGWGetObj::execute()
       } else
         new_ofs = ofs;
 
-      if (end >= cs_info.orig_size) {
+      if ((unsigned)end >= cs_info.orig_size) {
         ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
                          << ")" <<  dendl;
         new_end = cs_info.orig_size - 1;
@@ -1539,30 +1495,8 @@ void RGWGetObj::execute()
       goto done_err;
     }
 
-    if (partial_content) {
-
-      // if user set range, we need to calculate it in decompressed data
-      first_block = 0; last_block = 0;
-      if (cs_info.blocks.size() > 1) {
-        off_t i = 1;
-        while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs <= new_ofs) i++;
-        first_block = i - 1;
-        while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs < new_end) i++;
-        last_block = i - 1;
-      }
-    } else {
-      first_block = 0; last_block = cs_info.blocks.size() - 1;
-    }
-
-    q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs;
-    q_len = new_end - cs_info.blocks[last_block].old_ofs + 1;
-
-    new_ofs = cs_info.blocks[first_block].new_ofs;
-    new_end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
-
-    first_data = true;
-    cur_ofs = new_ofs;
-    waiting.clear();
+    decompress = boost::in_place(s->cct, &cs_info, partial_content, filter);
+    filter = &*decompress;
   }
 
   /* STAT ops don't need data, and do no i/o */
@@ -1577,21 +1511,10 @@ void RGWGetObj::execute()
 
   perfcounter->inc(l_rgw_get_b, new_end - new_ofs);
 
-  op_ret = this->get_decrypt_filter(&decrypt, cb);
-  if (op_ret < 0) {
-    goto done_err;
-  }
-  if (decrypt != nullptr) {
-    off_t tmp_ofs = ofs;
-    off_t tmp_end = end;
-    decrypt->fixup_range(tmp_ofs, tmp_end);
-    op_ret = read_op.iterate(tmp_ofs, tmp_end, decrypt);
-    if (op_ret >= 0)
-      op_ret = decrypt->flush();
-    delete decrypt;
-  }
-  else
-    op_ret = read_op.iterate(ofs, end, &cb);
+  filter->fixup_range(new_ofs, new_end);
+  op_ret = read_op.iterate(new_ofs, new_end, filter);
+  if (op_ret >= 0)
+    op_ret = filter->flush();
 
   perfcounter->tinc(l_rgw_get_lat,
                    (ceph_clock_now(s->cct) - start_time));
@@ -2006,9 +1929,12 @@ void RGWStatBucket::execute()
       op_ret = -EINVAL;
     }
   }
-  if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) {
-    ::decode(bucket.size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
-  }
+  RGWBucketCompressionInfo bcs;
+  int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bcs);
+  if (!res)
+    bucket.size = bcs.orig_size;
+  if (res < 0)
+      lderr(s->cct) << "Failed to read decompressed bucket size" << dendl;
 }
 
 int RGWListBucket::verify_permission()
@@ -2812,12 +2738,14 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, real_time *mtime, re
   info.size = s->obj_size;
   info.modified = real_clock::now();
   info.manifest = manifest;
-  if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
-    bool tmp;
-    RGWCompressionInfo cs_info;
-    rgw_compression_info_from_attrset(attrs, tmp, cs_info);
-    info.cs_info = cs_info;
+
+  bool compressed;
+  r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
+  if (r < 0) {
+    dout(1) << "cannot get compression info" << dendl;
+    return r;
   }
+
   ::encode(info, bl);
 
   string multipart_meta_obj = mp.get_meta();
@@ -2854,7 +2782,7 @@ RGWPutObjProcessor *RGWPutObj::select_processor(RGWObjectCtx& obj_ctx, bool *is_
   return processor;
 }
 
-void RGWPutObj::dispose_processor(RGWPutObjProcessor *processor)
+void RGWPutObj::dispose_processor(RGWPutObjDataProcessor *processor)
 {
   delete processor;
 }
@@ -2920,6 +2848,7 @@ int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
 void RGWPutObj::execute()
 {
   RGWPutObjProcessor *processor = NULL;
+  RGWPutObjDataProcessor *filter = NULL;
   char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
   char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
@@ -2933,6 +2862,10 @@ void RGWPutObj::execute()
   off_t fst;
   off_t lst;
   uint64_t bucket_size = 0;
+  RGWBucketCompressionInfo bucket_size;
+  int res;
+  bool compression_enabled;
+  boost::optional<RGWPutObj_Compress> compressor;
 
   bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
 
@@ -2993,6 +2926,9 @@ void RGWPutObj::execute()
 
   processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
 
+  // no filters by default
+  filter = processor;
+
   /* Handle object versioning of Swift API. */
   if (! multipart) {
     rgw_obj obj(s->bucket, s->object);
@@ -3014,7 +2950,11 @@ void RGWPutObj::execute()
 
   fst = copy_source_range_fst;
   lst = copy_source_range_lst;
-  processor->compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+  compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+  if (compression_enabled) {
+    compressor = boost::in_place(s->cct, filter);
+    filter = &*compressor;
+  }
 
   do {
     bufferlist data_in;
@@ -3061,7 +3001,7 @@ void RGWPutObj::execute()
       orig_data = data;
     }
 
-    op_ret = put_data_and_throttle(processor, data, ofs, need_to_wait);
+    op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
     if (op_ret < 0) {
       if (!need_to_wait || op_ret != -EEXIST) {
         ldout(s->cct, 20) << "processor->thottle_data() returned ret="
@@ -3079,6 +3019,8 @@ void RGWPutObj::execute()
       dispose_processor(processor);
       processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
 
+      filter = processor;
+
       string oid_rand;
       char buf[33];
       gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
@@ -3091,7 +3033,12 @@ void RGWPutObj::execute()
         goto done;
       }
 
-      op_ret = put_data_and_throttle(processor, data, ofs, false);
+      if (compression_enabled) {
+        compressor = boost::in_place(s->cct, filter);
+        filter = &*compressor;
+      }
+
+      op_ret = put_data_and_throttle(filter, data, ofs, false);
       if (op_ret < 0) {
         goto done;
       }
@@ -3143,24 +3090,21 @@ void RGWPutObj::execute()
 
   hash.Final(m);
 
-  if (processor->is_compressed()) {
+  if (compression_enabled && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
     cs_info.compression_type = s->cct->_conf->rgw_compression_type;
     cs_info.orig_size = s->obj_size;
-    cs_info.blocks = processor->get_compression_blocks();
+    cs_info.blocks = move(compressor->get_compression_blocks());
     ::encode(cs_info, tmp);
     attrs[RGW_ATTR_COMPRESSION] = tmp;
   }
 
-  // add attr to bucket to know original size of data
-  if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end())
-    ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
-  bucket_size += s->obj_size;
-  ::encode(bucket_size, bs);
-  s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
-        &s->bucket_attrs);
+  // add attr to bucket to know original  size of data
+  res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size);
+  if (res < 0)
+    lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl;
+  op_ret = update_compressed_bucket_size(s->obj_size, bucket_size);
   if (op_ret < 0)
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
          << " returned err=" << op_ret << dendl;
@@ -3244,6 +3188,22 @@ int RGWPostObj::verify_permission()
   return 0;
 }
 
+RGWPutObjProcessor *RGWPostObj::select_processor(RGWObjectCtx& obj_ctx)
+{
+  RGWPutObjProcessor *processor;
+
+  uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
+
+  processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
+
+  return processor;
+}
+
+void RGWPostObj::dispose_processor(RGWPutObjDataProcessor *processor)
+{
+  delete processor;
+}
+
 void RGWPostObj::pre_exec()
 {
   rgw_bucket_object_pre_exec(s);
@@ -3251,11 +3211,14 @@ void RGWPostObj::pre_exec()
 
 void RGWPostObj::execute()
 {
+  RGWPutObjDataProcessor *filter = NULL;
   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
   unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
   MD5 hash;
   buffer::list bl, aclbl;
   int len = 0;
+  bool compression_enabled;
+  boost::optional<RGWPutObj_Compress> compressor;
 
   // read in the data from the POST form
   op_ret = get_params();
@@ -3288,12 +3251,18 @@ void RGWPostObj::execute()
                                       s->req_id,
                                       s->bucket_info.versioning_enabled());
 
+  // no filters by default
+  filter = &processor;
+
   op_ret = processor.prepare(store, nullptr);
-  if (op_ret < 0) {
+  if (op_ret < 0)
     return;
-  }
 
-  processor.compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+  compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+  if (compression_enabled) {
+    compressor = boost::in_place(s->cct, filter);
+    filter = &*compressor;
+  }
 
   while (data_pending) {
      bufferlist data;
@@ -3308,7 +3277,7 @@ void RGWPostObj::execute()
        break;
 
      hash.Update((const byte *)data.c_str(), data.length());
-     op_ret = put_data_and_throttle(&processor, data, ofs, &hash, false);
+     op_ret = put_data_and_throttle(filter, data, ofs, false);
 
      ofs += len;
 
@@ -3347,12 +3316,12 @@ void RGWPostObj::execute()
     emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
   }
 
-  if (processor.is_compressed()) {
+  if (compression_enabled && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;
     cs_info.compression_type = s->cct->_conf->rgw_compression_type;
     cs_info.orig_size = s->obj_size;
-    cs_info.blocks = processor.get_compression_blocks();
+    cs_info.blocks = move(compressor->get_compression_blocks());
     ::encode(cs_info, tmp);
     emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
   }
@@ -3760,22 +3729,26 @@ void RGWDeleteObj::execute()
       if (op_ret >= 0) {
         delete_marker = del_op.result.delete_marker;
         version_id = del_op.result.version_id;
-        if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) {
-          uint64_t bucket_size, deleted_size;
-          ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
-          if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
-            bool tmp;
-            RGWCompressionInfo cs_info;
-            rgw_compression_info_from_attrset(attrs, tmp, cs_info);
-            deleted_size = cs_info.orig_size;
-          } else
+        RGWBucketCompressionInfo bucket_size;
+        int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size);
+        if (res < 0)
+          lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl;
+        if (!res) {
+          uint64_t deleted_size;
+          bool tmp;
+          RGWCompressionInfo cs_info;
+          res = rgw_compression_info_from_attrset(attrs, tmp, cs_info);
+          if (res < 0) {
+            lderr(s->cct) << "ERROR: failed to decode compression info, cannot update stat" << dendl;
             deleted_size = s->obj_size;
-          bucket_size -= deleted_size;
-          bufferlist bs;
-          ::encode(bucket_size, bs);
-          s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
-          op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
-            &s->bucket_attrs);
+          } else {
+            if (tmp) 
+              deleted_size = cs_info.orig_size;
+            else
+              deleted_size = s->obj_size;
+          }
+          if (update_compressed_bucket_size(-deleted_size, bucket_size) < 0)
+            lderr(s->cct) << "ERROR: failed to update bucket stat" << dendl;
         }
       }
 
@@ -4957,14 +4930,14 @@ void RGWCompleteMultipart::execute()
         }
         int new_ofs; // offset in compression data for new part
         if (cs_info.blocks.size() > 0)
-          new_ofs = cs_info.blocks[cs_info.blocks.size() - 1].new_ofs + cs_info.blocks[cs_info.blocks.size() - 1].len;
+          new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
         else
           new_ofs = 0;
-        for (off_t i=0; i < obj_part.cs_info.blocks.size(); ++i) {
+        for (const auto& block : obj_part.cs_info.blocks) {
           compression_block cb;
-          cb.old_ofs = obj_part.cs_info.blocks[i].old_ofs + cs_info.orig_size;
+          cb.old_ofs = block.old_ofs + cs_info.orig_size;
           cb.new_ofs = new_ofs;
-          cb.len = obj_part.cs_info.blocks[i].len;
+          cb.len = block.len;
           cs_info.blocks.push_back(cb);
           new_ofs = cb.new_ofs + cb.len;
         } 
@@ -5112,18 +5085,20 @@ void RGWAbortMultipart::execute()
       }
         map<string, bufferlist> attrset;
         int y = get_obj_attrs(store, s, obj, attrset);
-        if (!y && attrset.find(RGW_ATTR_COMPRESSION) != attrset.end()) {
+        map<string, bufferlist>::iterator cmp = attrset.find(RGW_ATTR_COMPRESSION);
+        if (!y && cmp != attrset.end()) {
           RGWCompressionInfo cs_info;
-          bufferlist::iterator bliter = attrset[RGW_ATTR_COMPRESSION].begin();
+          bufferlist::iterator bliter = cmp->second.begin();
           try {
             ::decode(cs_info, bliter);
+            if (cs_info.compression_type != "none")
+              deleted_size += cs_info.orig_size;
+            else
+              deleted_size += obj_part.size;
           } catch (buffer::error& err) {
             ldout(s->cct, 5) << "Failed to get decompressed obj size" << dendl;
-          }
-          if (cs_info.compression_type != "none")
-            deleted_size += cs_info.orig_size;
-          else
             deleted_size += obj_part.size;
+          }
         } else
           deleted_size += obj_part.size;
     }
@@ -5152,15 +5127,13 @@ void RGWAbortMultipart::execute()
   }
 
   if (!op_ret) {
-    if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) {
-      uint64_t bucket_size;
-      ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
-      bucket_size -= deleted_size;
-      bufferlist bs;
-      ::encode(bucket_size, bs);
-      s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
-      op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
-        &s->bucket_attrs);
+    RGWBucketCompressionInfo bucket_size;
+    int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size);
+    if (res < 0)
+      lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl;
+    if (!res) {
+      if (update_compressed_bucket_size(-deleted_size, bucket_size) < 0)
+            lderr(s->cct) << "ERROR: failed to update bucket stat" << dendl;
     }
   }
 
index 01e21d5f9752e871f6281f97094888368dd7a1ff..6d5569207795307db86c9f1bbcbbc41b12257e16 100644 (file)
@@ -19,6 +19,7 @@
 #include <map>
 
 #include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
 
 #include "common/armor.h"
 #include "common/mime.h"
@@ -86,6 +87,8 @@ RGWOp() : s(nullptr), dialect_handler(nullptr), store(nullptr),
   }
   int read_bucket_cors();
   bool generate_cors_headers(string& origin, string& method, string& headers, string& exp_headers, unsigned *max_age);
+  // obj_size can be positive or negative
+  int update_compressed_bucket_size(uint64_t obj_size, RGWBucketCompressionInfo& bucket_size);
 
   virtual int verify_params() { return 0; }
   virtual bool prefetch_data() { return false; }
@@ -201,13 +204,6 @@ public:
   virtual RGWOpType get_type() { return RGW_OP_GET_OBJ; }
   virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; }
   virtual bool need_object_expiration() { return false; }
-  /**
-   * calculates filter used to decrypt RGW objects data
-   */
-  virtual int get_decrypt_filter(RGWGetDataCB** filter, RGWGetDataCB& cb) {
-    *filter = NULL;
-    return 0;
-  }
 };
 
 class RGWGetObj_CB : public RGWGetDataCB
@@ -225,30 +221,30 @@ public:
 class RGWGetObj_Filter : public RGWGetDataCB
 {
 protected:
-  RGWGetDataCB& next;
+  RGWGetDataCB* next;
 public:
-  RGWGetObj_Filter(RGWGetDataCB& next): next(next) {}
+  RGWGetObj_Filter(RGWGetDataCB* next): next(next) {}
   virtual ~RGWGetObj_Filter() {}
   /**
    * Passes data through filter.
    * Filter can modify content of bl.
    * When bl_len == 0 , it means 'flush
    */
-  virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
-    return next.handle_data(bl, bl_ofs, bl_len);
+  virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+    return next->handle_data(bl, bl_ofs, bl_len);
   }
   /**
    * Flushes any cached data. Used by RGWGetObjFilter.
    * Return logic same as handle_data.
    */
-  virtual int flush() {
-    return next.flush();
+  virtual int flush() override {
+    return next->flush();
   }
   /**
    * Allows filter to extend range required for successful filtering
    */
-  virtual void fixup_range(off_t& bl_ofs, off_t& bl_len) {
-    next.fixup_range(bl_ofs, bl_len);
+  virtual void fixup_range(off_t& ofs, off_t& end) override {
+    next->fixup_range(ofs, end);
   }
 };
 
@@ -759,7 +755,7 @@ public:
   }
 
   virtual RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart);
-  void dispose_processor(RGWPutObjProcessor *processor);
+  void dispose_processor(RGWPutObjDataProcessor *processor);
 
   int verify_permission();
   void pre_exec();
@@ -779,16 +775,16 @@ public:
 class RGWPutObj_Filter : public RGWPutObjDataProcessor
 {
 protected:
-  RGWPutObjDataProcessor& next;
+  RGWPutObjDataProcessor* next;
 public:
-  RGWPutObj_Filter(RGWPutObjDataProcessor& next) :
+  RGWPutObj_Filter(RGWPutObjDataProcessor* next) :
   next(next){}
-  virtual ~RGWPutObj_Filter(){}
-  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) {
-    return next.handle_data(bl, ofs, phandle, again);
+  virtual ~RGWPutObj_Filter() {}
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override {
+    return next->handle_data(bl, ofs, phandle, pobj, again);
   }
-  virtual int throttle_data(void *handle, bool need_to_wait) {
-    return next.throttle_data(handle, need_to_wait);
+  virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) override {
+    return next->throttle_data(handle, obj, need_to_wait);
   }
 }; /* RGWPutObj_Filter */
 
index ecdf104ea4fcf071117f529999df54e9d497f112..1b46ddfc45f02c17e9862d965f54f07a34aa7df0 100644 (file)
@@ -9,6 +9,7 @@
 
 #include <boost/format.hpp>
 #include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
 
 #include "common/ceph_json.h"
 #include "common/utf8.h"
@@ -2361,86 +2362,40 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
   return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
 
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again)
 {
   *again = false;
 
   *phandle = NULL;
 
-  bufferlist in_bl;
-
-  // compression stuff
-  if ((ofs > 0 && compressed) ||                                // if previous part was compressed
-      (ofs == 0 && compression_enabled)) {   // or it's the first part and flag is set
-    ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
-    CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type);
-    if (!compressor.get()) {
-      if (ofs > 0 && compressed) {
-        lderr(store->ctx()) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
-                            << " for next part, compression process failed" << dendl;
-        return -EIO;
-      }
-      // if compressor isn't available - just do not use it with log warning?
-      ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type 
-                       << "for rgw, check rgw_compression_type config option" << dendl;
-      compressed = false;
-      in_bl.claim(bl);
-    } else {
-      int cr = compressor->compress(bl, in_bl);
-      if (cr < 0) {
-        if (ofs > 0 && compressed) {
-          lderr(store->ctx()) << "Compression failed with exit code " << cr
-                              << " for next part, compression process failed" << dendl;
-          return -EIO;
-        }
-        ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
-        compressed = false;
-        in_bl.claim(bl);
-      } else {
-        compressed = true;
-  
-        compression_block newbl;
-        int bs = blocks.size();
-        newbl.old_ofs = ofs;
-        newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
-        newbl.len = in_bl.length();
-        blocks.push_back(newbl);
-      }
-    }
-  } else {
-    compressed = false;
-    in_bl.claim(bl);
-  }
-  // end of compression stuff
-
   if (extra_data_len) {
-    size_t extra_len = in_bl.length();
+    size_t extra_len = bl.length();
     if (extra_len > extra_data_len)
       extra_len = extra_data_len;
 
     bufferlist extra;
-    in_bl.splice(0, extra_len, &extra);
+    bl.splice(0, extra_len, &extra);
     extra_data_bl.append(extra);
 
     extra_data_len -= extra_len;
-    if (in_bl.length() == 0) {
+    if (bl.length() == 0) {
       return 0;
     }
   }
 
   uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
 
-  pending_data_bl.claim_append(in_bl);
+  pending_data_bl.claim_append(bl);
   if (pending_data_bl.length() < max_write_size)
     return 0;
 
-  pending_data_bl.splice(0, max_write_size, &in_bl);
+  pending_data_bl.splice(0, max_write_size, &bl);
 
   /* do we have enough data pending accumulated that needs to be written? */
   *again = (pending_data_bl.length() >= max_chunk_size);
 
   if (!data_ofs && !immutable_head()) {
-    first_chunk.claim(in_bl);
+    first_chunk.claim(bl);
     obj_len = (uint64_t)first_chunk.length();
     int r = prepare_next_part(obj_len);
     if (r < 0) {
@@ -2450,13 +2405,12 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
     return 0;
   }
   off_t write_ofs = data_ofs;
-  data_ofs = write_ofs + in_bl.length();
+  data_ofs = write_ofs + bl.length();
   bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
                                                         we could be racing with another upload, to the same
                                                         object and cleanup can be messy */
-  int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive);
+  int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
   if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
-    in_bl.clear();
     bl.clear();
   }
   return ret;
@@ -5262,9 +5216,10 @@ int RGWRados::Bucket::List::list_objects(int max, vector<RGWObjEnt> *result,
       bufferlist attr;
       map<string, bufferlist> attrset;
       int y = store->raw_obj_stat(cs_obj, NULL, NULL, NULL, &attrset, NULL, NULL);
-      if (!y && attrset.find(RGW_ATTR_COMPRESSION) != attrset.end()) {
+      map<string, bufferlist>::iterator cmp = attrset.find(RGW_ATTR_COMPRESSION);
+      if (!y && cmp != attrset.end()) {
         RGWCompressionInfo cs_info;
-        bufferlist::iterator bliter = attrset[RGW_ATTR_COMPRESSION].begin();
+        bufferlist::iterator bliter = cmp->second.begin();
         try {
           ::decode(cs_info, bliter);
         } catch (buffer::error& err) {
@@ -7401,8 +7356,9 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   attrs.erase(RGW_ATTR_ID_TAG);
   attrs.erase(RGW_ATTR_PG_VER);
   attrs.erase(RGW_ATTR_SOURCE_ZONE);
-  if (src_attrs.find(RGW_ATTR_COMPRESSION) != src_attrs.end())
-    attrs[RGW_ATTR_COMPRESSION] = src_attrs[RGW_ATTR_COMPRESSION];
+  map<string, bufferlist>::iterator cmp = src_attrs.find(RGW_ATTR_COMPRESSION);
+  if (cmp != src_attrs.end())
+    attrs[RGW_ATTR_COMPRESSION] = cmp->second;
 
   RGWObjManifest manifest;
   RGWObjState *astate = NULL;
@@ -12915,8 +12871,9 @@ int RGWRados::delete_obj_aio(rgw_obj& obj, rgw_bucket& bucket,
 }
 
 int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need_decompress, RGWCompressionInfo& cs_info) {
-  if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
-    bufferlist::iterator bliter = attrs[RGW_ATTR_COMPRESSION].begin();
+  map<string, bufferlist>::iterator value = attrs.find(RGW_ATTR_COMPRESSION);
+  if (value != attrs.end()) {
+    bufferlist::iterator bliter = value->second.begin();
     try {
       ::decode(cs_info, bliter);
     } catch (buffer::error& err) {
@@ -12933,4 +12890,17 @@ int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need
   }
 }
 
-
+int rgw_bucket_compression_info_from_attrset(map<string, bufferlist>& attrs, RGWBucketCompressionInfo& cs_info)
+{
+  map<string, bufferlist>::iterator cmp = attrs.find(RGW_ATTR_COMPRESSION);
+  if (cmp != attrs.end()) {
+    try {
+      ::decode(cs_info, cmp->second);
+    } catch (buffer::error& err) {
+      return -EIO;
+    }
+  } else {
+    return 1;
+  }
+  return 0;
+}
index 7406ded0d3eacad6a04233c4a25b192d8e51e9cb..e79f4a199e727f29e0e1d5de8e06f54abe055dcb 100644 (file)
@@ -105,6 +105,9 @@ struct RGWCompressionInfo {
   vector<compression_block> blocks;
 
   RGWCompressionInfo() : compression_type("none"), orig_size(0) {}
+  RGWCompressionInfo(const RGWCompressionInfo& cs_info) : compression_type(cs_info.compression_type),
+                                                          orig_size(cs_info.orig_size),
+                                                          blocks(cs_info.blocks) {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
@@ -124,7 +127,28 @@ struct RGWCompressionInfo {
 };
 WRITE_CLASS_ENCODER(RGWCompressionInfo)
 
+struct RGWBucketCompressionInfo {
+  uint64_t orig_size;
+
+  RGWBucketCompressionInfo() : orig_size(0) {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(orig_size, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+     ::decode(orig_size, bl);
+     DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(RGWBucketCompressionInfo)
+
 int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need_decompress, RGWCompressionInfo& cs_info);
+// return 0, if find and read; -EIO, if find and not read; 1 if not find
+int rgw_bucket_compression_info_from_attrset(map<string, bufferlist>& attrs, RGWBucketCompressionInfo& cs_info);
 
 struct RGWOLHInfo {
   rgw_obj target;
@@ -3319,10 +3343,8 @@ protected:
   RGWRados *store;
   RGWObjectCtx& obj_ctx;
   bool is_complete;
-  bool compressed;
   RGWBucketInfo bucket_info;
   bool canceled;
-  vector<compression_block> blocks;
 
   virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime,
                           map<string, bufferlist>& attrs, ceph::real_time delete_at,
@@ -3332,28 +3354,21 @@ public:
   RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), 
                                                                    obj_ctx(_obj_ctx), 
                                                                    is_complete(false), 
-                                                                   compressed(false), 
                                                                    bucket_info(_bi), 
-                                                                   canceled(false),
-                                                                   compression_enabled(false) {}
+                                                                   canceled(false) {}
   virtual ~RGWPutObjProcessor() {}
   virtual int prepare(RGWRados *_store, string *oid_rand) {
     store = _store;
     return 0;
   }
 
-  //virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again);
-  //virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
   virtual int complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime,
                        map<string, bufferlist>& attrs, ceph::real_time delete_at,
                        const char *if_match = NULL, const char *if_nomatch = NULL);
 
-  bool compression_enabled;
   CephContext *ctx();
 
   bool is_canceled() { return canceled; }
-  bool is_compressed() { return compressed; }
-  const vector<compression_block>& get_compression_blocks() { return blocks; }
 }; /* RGWPutObjProcessor */
 
 struct put_obj_aio_info {
index b482a2ace500804b5a8f6ea8b376dd20eb1c4b2d..977a73b67f50c68efa7cb180a1a9a78168474c18 100644 (file)
@@ -1240,7 +1240,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl,
                                                  const off_t bl_ofs,
                                                  const off_t bl_len)
 {
-  string content_type; int rr;
+  string content_type;
 
   if (sent_header) {
     goto send_data;
index c02b179490123a3212b0c0d23b69da13965d1552..9322c258084c6abdeefe00067f8b310a6e0d9a5a 160000 (submodule)
--- a/src/spdk
+++ b/src/spdk
@@ -1 +1 @@
-Subproject commit c02b179490123a3212b0c0d23b69da13965d1552
+Subproject commit 9322c258084c6abdeefe00067f8b310a6e0d9a5a
index a829809cf75fefc753ce548e2720e7730d472762..cf322b9eee5cad21598c83515a05e47ff77e89cb 100755 (executable)
@@ -16,12 +16,6 @@ if [ -n "$VSTART_DEST" ]; then
   CEPH_OUT_DIR=$VSTART_DEST/out
 fi
 
-mkdir -p .libs/compressor
-for f in `ls -d compressor/*/`; 
-do 
-    cp .libs/libceph_`basename $f`.so* .libs/compressor/;
-done
-
 # for running out of the CMake build directory
 if [ -e CMakeCache.txt ]; then
   # Out of tree build, learn source location from CMakeCache.txt
index 44a6297b298e59ab7452defe859f21ed8371aa1c..1f40c6511fa8dd9d2e337ca8c9bc18b3e87663c9 160000 (submodule)
@@ -1 +1 @@
-Subproject commit 44a6297b298e59ab7452defe859f21ed8371aa1c
+Subproject commit 1f40c6511fa8dd9d2e337ca8c9bc18b3e87663c9