]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add compression support for ranged queries
authorVed-vampir <akiselyova@mirantis.com>
Wed, 13 Apr 2016 11:35:27 +0000 (14:35 +0300)
committerAdam Kupczyk <akupczyk@mirantis.com>
Wed, 2 Nov 2016 10:31:10 +0000 (11:31 +0100)
Signed-off-by: Alyona Kiseleva <akiselyova@mirantis.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/vstart.sh

index e5f2366dad75f08a084a7f109b2608d128a55e57..437d5e5a35d07c241034255c90455a76d288ac57 100644 (file)
@@ -1267,7 +1267,8 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
   }
   // compression stuff
   if (need_decompress) {
-    ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part" << dendl;
+    ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl;
+
     CompressorRef compressor = Compressor::create(s->cct, cs_info.compression_type);
     if (!compressor.get()) {
       // if compressor isn't available - error, because cannot return decompressed data?
@@ -1275,12 +1276,47 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
                        << "for rgw, check rgw_compression_type config option" << dendl;
       return -EIO;
     } else {
-      bufferlist out_bl;
-      int cr = compressor->decompress(bl, out_bl);
-      if (cr < 0) {
-        lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
-        return cr;
+      bufferlist out_bl, in_bl;
+      bl_ofs = 0;
+      if (waiting.length() != 0) {
+        in_bl.append(waiting);
+        in_bl.append(bl);        
+        waiting.clear();
+      } else {
+        in_bl.claim(bl);
+      }
+      bl_len = in_bl.length();
+      
+      while (first_block <= last_block) {
+        bufferlist tmp, tmp_out;
+        int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs;
+        if (ofs_in_bl + cs_info.blocks[first_block].len > bl.length()) {
+          // not complete block, put it to waiting
+          int tail = bl.length() - ofs_in_bl;
+          bl.copy(ofs_in_bl, tail, waiting);
+          cur_ofs -= tail;
+          break;
+        }
+        bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
+        int cr = compressor->decompress(tmp, tmp_out);
+        if (cr < 0) {
+          lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
+          return cr;
+        }
+        if (first_block == last_block && partial_content)
+          out_bl.append(tmp_out.c_str(), q_len);
+        else
+          out_bl.append(tmp_out);
+        first_block++;
       }
+
+      if (first_data && partial_content && out_bl.length() != 0)
+        bl_ofs =  q_ofs;
+
+      if (first_data && out_bl.length() != 0)
+        first_data = false;
+
+      cur_ofs += bl_len;
       return send_response_data(out_bl, bl_ofs, out_bl.length());
     }
   }
@@ -1440,7 +1476,7 @@ void RGWGetObj::execute()
     lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
     goto done_err;
   }
-  if (need_decompress) {
+  if (need_decompress && !partial_content) {
     total_len = cs_info.orig_size;
   }
 
@@ -1457,6 +1493,44 @@ void RGWGetObj::execute()
 
   start = ofs;
 
+  if (need_decompress) {
+    if (partial_content) {
+      // if user set range, we need to calculate it in decompressed data
+      first_block = 0; last_block = 0;
+      if (cs_info.blocks.size() > 1) {
+        off_t i = 1;
+        while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs <= new_ofs) i++;
+        first_block = i - 1;
+        while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs < new_end) i++;
+        last_block = i - 1;
+      }
+    } else {
+      first_block = 0; last_block = cs_info.blocks.size() - 1;
+    }
+
+    ofs = cs_info.blocks[first_block].new_ofs;
+    end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
+
+    // check user range for correctness
+    if (new_ofs < 0) {
+      ldout(s->cct, 5) << "WARNING: uncorrect begin of the bytes range, get 0 instead" <<  dendl;
+      new_ofs = 0;
+    }
+    if (new_end >= cs_info.orig_size) {
+      ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
+                       << ")" <<  dendl;
+      new_end = cs_info.orig_size - 1;
+    }
+
+    q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs;
+    q_len = new_end - cs_info.blocks[last_block].old_ofs + 1;
+
+    first_data = true;
+    cur_ofs = ofs;
+    waiting.clear();
+
+  }
+
   /* STAT ops don't need data, and do no i/o */
   if (get_type() == RGW_OP_STAT_OBJ) {
     return;
@@ -3020,6 +3094,7 @@ void RGWPutObj::execute()
     RGWCompressionInfo cs_info;
     cs_info.compression_type = s->cct->_conf->rgw_compression_type;
     cs_info.orig_size = s->obj_size;
+    cs_info.blocks = processor->get_compression_blocks();
     ::encode(cs_info, tmp);
     attrs[RGW_ATTR_COMPRESSION] = tmp;
   }
index 9e9eef14873ce6aaf2acf4adf86abd7974705154..2431b7cf58503d6f3c12929703d9d5e138a2112c 100644 (file)
@@ -138,6 +138,11 @@ protected:
   // compression attrs
   RGWCompressionInfo cs_info;
   bool need_decompress;
+  off_t first_block, last_block;
+  off_t q_ofs, q_len;
+  bool first_data;
+  uint64_t cur_ofs;
+  bufferlist waiting;
 
   int init_common();
 public:
@@ -161,6 +166,13 @@ public:
     skip_manifest = false;
     is_slo = false;
     rgwx_stat = false;
+    need_decompress = false;
+    first_block = 0;
+    last_block = 0;
+    q_ofs = 0;
+    q_len = 0;
+    first_data = true;
+    cur_ofs = 0;
  }
 
   bool prefetch_data();
index abe23089de158003553c915d8528529e9da59a8b..cea6d3cfc8f8d0e99c912ee256e69dd91ef8261c 100644 (file)
@@ -2373,9 +2373,14 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
   bool compression_enabled = store->ctx()->_conf->rgw_compression_type != "none";
   if ((ofs > 0 && compressed) ||                                // if previous part was compressed
       (ofs == 0 && compression_enabled)) {   // or it's the first part and flag is set
-    ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part" << dendl;
+    ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
     CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type);
     if (!compressor.get()) {
+      if (ofs > 0 && compressed) {
+        lderr(store->ctx()) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
+                            << " for next part, compression process failed" << dendl;
+        return -EIO;
+      }
       // if compressor isn't available - just do not use it with log warning?
       ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type 
                        << "for rgw, check rgw_compression_type config option" << dendl;
@@ -2383,12 +2388,24 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
       in_bl.claim(bl);
     } else {
       int cr = compressor->compress(bl, in_bl);
-      if (cr != 0) {
+      if (cr < 0) {
+        if (ofs > 0 && compressed) {
+          lderr(store->ctx()) << "Compression failed with exit code " << cr
+                              << " for next part, compression process failed" << dendl;
+          return -EIO;
+        }
         ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
         compressed = false;
         in_bl.claim(bl);
       } else {
         compressed = true;
+  
+        compression_block newbl;
+        int bs = blocks.size();
+        newbl.old_ofs = ofs;
+        newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
+        newbl.len = in_bl.length();
+        blocks.push_back(newbl);
       }
     }
   } else {
@@ -12893,7 +12910,10 @@ int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need
     } catch (buffer::error& err) {
       return -EIO;
     }
-    need_decompress = true;
+    if (cs_info.compression_type != "none")
+      need_decompress = true;
+    else
+      need_decompress = false;
     return 0;
   } else {
     need_decompress = false;
index 6b42c97e4f42a94059a083074408fa6e99a86e3e..4c67b4a02b555f93efa82a98299f901dd40fee19 100644 (file)
@@ -76,9 +76,33 @@ static inline void get_obj_bucket_and_oid_loc(const rgw_obj& obj, rgw_bucket& bu
 
 int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset, RGWAccessControlPolicy *policy);
 
+struct compression_block {
+  uint64_t old_ofs;
+  uint64_t new_ofs;
+  uint64_t len;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(old_ofs, bl);
+    ::encode(new_ofs, bl);
+    ::encode(len, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+     ::decode(old_ofs, bl);
+     ::decode(new_ofs, bl);
+     ::decode(len, bl);
+     DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(compression_block)
+
 struct RGWCompressionInfo {
   string compression_type;
   uint64_t orig_size;
+  vector<compression_block> blocks;
 
   RGWCompressionInfo() : compression_type("none"), orig_size(0) {}
 
@@ -86,6 +110,7 @@ struct RGWCompressionInfo {
     ENCODE_START(1, 1, bl);
     ::encode(compression_type, bl);
     ::encode(orig_size, bl);
+    ::encode(blocks, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -93,6 +118,7 @@ struct RGWCompressionInfo {
      DECODE_START(1, bl);
      ::decode(compression_type, bl);
      ::decode(orig_size, bl);
+     ::decode(blocks, bl);
      DECODE_FINISH(bl);
   }
 };
@@ -3267,6 +3293,7 @@ protected:
   bool compressed;
   RGWBucketInfo bucket_info;
   bool canceled;
+  vector<compression_block> blocks;
 
   virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime,
                           map<string, bufferlist>& attrs, ceph::real_time delete_at,
@@ -3292,6 +3319,7 @@ public:
 
   bool is_canceled() { return canceled; }
   bool is_compressed() { return compressed; }
+  const vector<compression_block>& get_compression_blocks() { return blocks; }
 }; /* RGWPutObjProcessor */
 
 struct put_obj_aio_info {
index cf322b9eee5cad21598c83515a05e47ff77e89cb..a829809cf75fefc753ce548e2720e7730d472762 100755 (executable)
@@ -16,6 +16,12 @@ if [ -n "$VSTART_DEST" ]; then
   CEPH_OUT_DIR=$VSTART_DEST/out
 fi
 
+mkdir -p .libs/compressor
+for f in `ls -d compressor/*/`; 
+do 
+    cp .libs/libceph_`basename $f`.so* .libs/compressor/;
+done
+
 # for running out of the CMake build directory
 if [ -e CMakeCache.txt ]; then
   # Out of tree build, learn source location from CMakeCache.txt