]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: compression: add multipart and copy support
authorVed-vampir <akiselyova@mirantis.com>
Thu, 28 Apr 2016 15:35:47 +0000 (18:35 +0300)
committerAdam Kupczyk <akupczyk@mirantis.com>
Wed, 2 Nov 2016 10:34:49 +0000 (11:34 +0100)
Signed-off-by: Alyona Kiseleva <akiselyova@mirantis.com>
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_swift.cc

index 437d5e5a35d07c241034255c90455a76d288ac57..def03bebe5ff9e41c495a0e9e9df49e1067cf950 100644 (file)
@@ -1290,14 +1290,14 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
       while (first_block <= last_block) {
         bufferlist tmp, tmp_out;
         int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs;
-        if (ofs_in_bl + cs_info.blocks[first_block].len > bl.length()) {
+        if (ofs_in_bl + cs_info.blocks[first_block].len > bl_len) {
           // not complete block, put it to waiting
-          int tail = bl.length() - ofs_in_bl;
-          bl.copy(ofs_in_bl, tail, waiting);
+          int tail = bl_len - ofs_in_bl;
+          in_bl.copy(ofs_in_bl, tail, waiting);
           cur_ofs -= tail;
           break;
         }
-        bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
+        in_bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
         int cr = compressor->decompress(tmp, tmp_out);
         if (cr < 0) {
           lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
@@ -1317,7 +1317,7 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
         first_data = false;
 
       cur_ofs += bl_len;
-      return send_response_data(out_bl, bl_ofs, out_bl.length());
+      return send_response_data(out_bl, bl_ofs, out_bl.length() - bl_ofs);
     }
   }
   // end of compression stuff
@@ -1388,6 +1388,7 @@ void RGWGetObj::execute()
 
   perfcounter->inc(l_rgw_get);
   int64_t new_ofs, new_end;
+  int cret;
 
   RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
   RGWRados::Object::Read read_op(&op_target);
@@ -1417,8 +1418,55 @@ void RGWGetObj::execute()
   read_op.params.perr = &s->err;
 
   op_ret = read_op.prepare(&new_ofs, &new_end);
-  if (op_ret < 0)
+  if (op_ret < 0 && op_ret != -ERANGE) // check erange error later
+    goto done_err;
+
+  cret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
+  if (cret < 0) {
+    lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
+    if (op_ret == 0)
+      op_ret = cret;
     goto done_err;
+  }
+
+  if (op_ret == -ERANGE && !need_decompress) 
+    goto done_err;
+
+  if (need_decompress) {
+    op_ret = 0;
+    s->obj_size = cs_info.orig_size;
+
+    if (partial_content) {
+      // recheck user range for correctness
+      if (ofs < 0) {
+        ofs += cs_info.orig_size;
+        if (ofs < 0)
+          ofs = 0;
+        end = cs_info.orig_size - 1;
+      } else if (end < 0) {
+        end = cs_info.orig_size - 1;
+      }
+
+      if (ofs >= cs_info.orig_size) {
+        lderr(s->cct) << "ERROR: begin of the bytes range more than object size (" << cs_info.orig_size
+                         << ")" <<  dendl;
+        op_ret = -ERANGE;
+        goto done_err;
+      } else
+        new_ofs = ofs;
+
+      if (end >= cs_info.orig_size) {
+        ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
+                         << ")" <<  dendl;
+        new_end = cs_info.orig_size - 1;
+      } else
+        new_end = end;
+
+      total_len = new_end - new_ofs + 1;
+
+    } else
+      total_len = cs_info.orig_size;
+  }
 
   // for range requests with obj size 0
   if (range_str && !(s->obj_size)) {
@@ -1471,16 +1519,6 @@ void RGWGetObj::execute()
     return;
   }
 
-  op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
-  if (op_ret < 0) {
-    lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
-    goto done_err;
-  }
-  if (need_decompress && !partial_content) {
-    total_len = cs_info.orig_size;
-  }
-
-
   /* Check whether the object has expired. Swift API documentation
    * stands that we should return 404 Not Found in such case. */
   if (need_object_expiration() && object_is_expired(attrs)) {
@@ -1494,7 +1532,15 @@ void RGWGetObj::execute()
   start = ofs;
 
   if (need_decompress) {
+
+    if (cs_info.blocks.size() == 0) {
+      lderr(s->cct) << "ERROR: no info about compression blocks, cannot decompress" << dendl;
+      op_ret = -EIO;
+      goto done_err;
+    }
+
     if (partial_content) {
+
       // if user set range, we need to calculate it in decompressed data
       first_block = 0; last_block = 0;
       if (cs_info.blocks.size() > 1) {
@@ -1508,27 +1554,15 @@ void RGWGetObj::execute()
       first_block = 0; last_block = cs_info.blocks.size() - 1;
     }
 
-    ofs = cs_info.blocks[first_block].new_ofs;
-    end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
-
-    // check user range for correctness
-    if (new_ofs < 0) {
-      ldout(s->cct, 5) << "WARNING: uncorrect begin of the bytes range, get 0 instead" <<  dendl;
-      new_ofs = 0;
-    }
-    if (new_end >= cs_info.orig_size) {
-      ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
-                       << ")" <<  dendl;
-      new_end = cs_info.orig_size - 1;
-    }
-
     q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs;
     q_len = new_end - cs_info.blocks[last_block].old_ofs + 1;
 
+    new_ofs = cs_info.blocks[first_block].new_ofs;
+    new_end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
+
     first_data = true;
-    cur_ofs = ofs;
+    cur_ofs = new_ofs;
     waiting.clear();
-
   }
 
   /* STAT ops don't need data, and do no i/o */
@@ -1536,14 +1570,14 @@ void RGWGetObj::execute()
     return;
   }
 
-  if (!get_data || ofs > end) {
+  if (!get_data || new_ofs > new_end) {
     send_response_data(bl, 0, 0);
     return;
   }
 
-  perfcounter->inc(l_rgw_get_b, end - ofs);
+  perfcounter->inc(l_rgw_get_b, new_end - new_ofs);
 
-  op_ret = read_op.iterate(ofs, end, &cb);
+  op_ret = read_op.iterate(new_ofs, new_end, &cb);
 
   perfcounter->tinc(l_rgw_get_lat,
                    (ceph_clock_now(s->cct) - start_time));
@@ -2761,6 +2795,12 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, real_time *mtime, re
   info.size = s->obj_size;
   info.modified = real_clock::now();
   info.manifest = manifest;
+  if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
+    bool tmp;
+    RGWCompressionInfo cs_info;
+    rgw_compression_info_from_attrset(attrs, tmp, cs_info);
+    info.cs_info = cs_info;
+  }
   ::encode(info, bl);
 
   string multipart_meta_obj = mp.get_meta();
@@ -2957,6 +2997,7 @@ void RGWPutObj::execute()
 
   fst = copy_source_range_fst;
   lst = copy_source_range_lst;
+  processor->compression_enabled = s->cct->_conf->rgw_compression_type != "none";
 
   do {
     bufferlist data_in;
@@ -3079,7 +3120,6 @@ void RGWPutObj::execute()
     dout(10) << "v4 auth ok" << dendl;
 
   }
-
   op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
                               user_quota, bucket_quota, s->obj_size);
   if (op_ret < 0) {
@@ -3228,6 +3268,8 @@ void RGWPostObj::execute()
     return;
   }
 
+  processor.compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+
   while (data_pending) {
      bufferlist data;
      len = get_data(data);
@@ -3240,6 +3282,7 @@ void RGWPostObj::execute()
      if (!len)
        break;
 
+     hash.Update((const byte *)data.c_str(), data.length());
      op_ret = put_data_and_throttle(&processor, data, ofs, &hash, false);
 
      ofs += len;
@@ -3263,7 +3306,6 @@ void RGWPostObj::execute()
     return;
   }
 
-  processor.complete_hash(&hash);
   hash.Final(m);
   buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
 
@@ -3280,6 +3322,16 @@ void RGWPostObj::execute()
     emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
   }
 
+  if (processor.is_compressed()) {
+    bufferlist tmp;
+    RGWCompressionInfo cs_info;
+    cs_info.compression_type = s->cct->_conf->rgw_compression_type;
+    cs_info.orig_size = s->obj_size;
+    cs_info.blocks = processor.get_compression_blocks();
+    ::encode(cs_info, tmp);
+    emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
+  }
+
   op_ret = processor.complete(etag, NULL, real_time(), attrs, delete_at);
 }
 
@@ -4771,6 +4823,8 @@ void RGWCompleteMultipart::execute()
   int max_parts = 1000;
   int marker = 0;
   bool truncated;
+  RGWCompressionInfo cs_info;
+  bool compressed = false;
 
   uint64_t min_part_size = s->cct->_conf->rgw_multipart_min_part_size;
 
@@ -4852,6 +4906,32 @@ void RGWCompleteMultipart::execute()
         manifest.append(obj_part.manifest);
       }
 
+      if (obj_part.cs_info.compression_type != "none") {
+        if (compressed && cs_info.compression_type != obj_part.cs_info.compression_type) {
+          ldout(s->cct, 0) << "ERROR: compression type was changed during multipart upload ("
+                           << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
+          op_ret = -ERR_INVALID_PART;
+          return;
+        }
+        int new_ofs; // offset in compression data for new part
+        if (cs_info.blocks.size() > 0)
+          new_ofs = cs_info.blocks[cs_info.blocks.size() - 1].new_ofs + cs_info.blocks[cs_info.blocks.size() - 1].len;
+        else
+          new_ofs = 0;
+        for (off_t i=0; i < obj_part.cs_info.blocks.size(); ++i) {
+          compression_block cb;
+          cb.old_ofs = obj_part.cs_info.blocks[i].old_ofs + cs_info.orig_size;
+          cb.new_ofs = new_ofs;
+          cb.len = obj_part.cs_info.blocks[i].len;
+          cs_info.blocks.push_back(cb);
+          new_ofs = cb.new_ofs + cb.len;
+        } 
+        if (!compressed)
+          cs_info.compression_type = obj_part.cs_info.compression_type;
+        cs_info.orig_size += obj_part.cs_info.orig_size;
+        compressed = true;
+      }
+
       rgw_obj_key remove_key;
       src_obj.get_index_key(&remove_key);
 
@@ -4872,6 +4952,13 @@ void RGWCompleteMultipart::execute()
 
   attrs[RGW_ATTR_ETAG] = etag_bl;
 
+  if (compressed) {
+    // write compression attribute to full object
+    bufferlist tmp;
+    ::encode(cs_info, tmp);
+    attrs[RGW_ATTR_COMPRESSION] = tmp;
+  }
+
   target_obj.init(s->bucket, s->object.name);
   if (versioned_object) {
     store->gen_rand_obj_instance_name(&target_obj);
index cea6d3cfc8f8d0e99c912ee256e69dd91ef8261c..75df21a363b416d46d2341509480c8ca9aa5d053 100644 (file)
@@ -2370,7 +2370,6 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
   bufferlist in_bl;
 
   // compression stuff
-  bool compression_enabled = store->ctx()->_conf->rgw_compression_type != "none";
   if ((ofs > 0 && compressed) ||                                // if previous part was compressed
       (ofs == 0 && compression_enabled)) {   // or it's the first part and flag is set
     ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
@@ -2414,7 +2413,6 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
   }
   // end of compression stuff
 
-
   if (extra_data_len) {
     size_t extra_len = in_bl.length();
     if (extra_len > extra_data_len)
@@ -7393,6 +7391,8 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   attrs.erase(RGW_ATTR_ID_TAG);
   attrs.erase(RGW_ATTR_PG_VER);
   attrs.erase(RGW_ATTR_SOURCE_ZONE);
+  if (src_attrs.find(RGW_ATTR_COMPRESSION) != src_attrs.end())
+    attrs[RGW_ATTR_COMPRESSION] = src_attrs[RGW_ATTR_COMPRESSION];
 
   RGWObjManifest manifest;
   RGWObjState *astate = NULL;
@@ -9140,9 +9140,11 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend)
     end = astate->size - 1;
   }
 
+  int ret = 0;
+
   if (astate->size > 0) {
     if (ofs >= (off_t)astate->size) {
-      return -ERANGE;
+      ret = -ERANGE;
     }
     if (end >= (off_t)astate->size) {
       end = astate->size - 1;
@@ -9160,7 +9162,7 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend)
   if (params.lastmod)
     *params.lastmod = astate->mtime;
 
-  return 0;
+  return ret;
 }
 
 int RGWRados::SystemObject::get_state(RGWObjState **pstate, RGWObjVersionTracker *objv_tracker)
index 4c67b4a02b555f93efa82a98299f901dd40fee19..2ea19ce4addc81da05f213b426c0a174ca477110 100644 (file)
@@ -678,26 +678,30 @@ struct RGWUploadPartInfo {
   string etag;
   ceph::real_time modified;
   RGWObjManifest manifest;
+  RGWCompressionInfo cs_info;
 
   RGWUploadPartInfo() : num(0), size(0) {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(3, 2, bl);
+    ENCODE_START(4, 2, bl);
     ::encode(num, bl);
     ::encode(size, bl);
     ::encode(etag, bl);
     ::encode(modified, bl);
     ::encode(manifest, bl);
+    ::encode(cs_info, bl);
     ENCODE_FINISH(bl);
   }
   void decode(bufferlist::iterator& bl) {
-    DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+    DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
     ::decode(num, bl);
     ::decode(size, bl);
     ::decode(etag, bl);
     ::decode(modified, bl);
     if (struct_v >= 3)
       ::decode(manifest, bl);
+    if (struct_v >= 4)
+      ::decode(cs_info, bl);
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
@@ -3300,7 +3304,13 @@ protected:
                           const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
 
 public:
-  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), compressed(false), bucket_info(_bi), canceled(false) {}
+  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), 
+                                                                   obj_ctx(_obj_ctx), 
+                                                                   is_complete(false), 
+                                                                   compressed(false), 
+                                                                   bucket_info(_bi), 
+                                                                   canceled(false),
+                                                                   compression_enabled(false) {}
   virtual ~RGWPutObjProcessor() {}
   virtual int prepare(RGWRados *_store, string *oid_rand) {
     store = _store;
@@ -3315,6 +3325,7 @@ public:
                        map<string, bufferlist>& attrs, ceph::real_time delete_at,
                        const char *if_match = NULL, const char *if_nomatch = NULL);
 
+  bool compression_enabled;
   CephContext *ctx();
 
   bool is_canceled() { return canceled; }
index d19ff7e01ede27022db0e223b3147afc816e02a1..b482a2ace500804b5a8f6ea8b376dd20eb1c4b2d 100644 (file)
@@ -1240,7 +1240,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl,
                                                  const off_t bl_ofs,
                                                  const off_t bl_len)
 {
-  string content_type;
+  string content_type; int rr;
 
   if (sent_header) {
     goto send_data;
@@ -1272,7 +1272,6 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl,
   dump_content_length(s, total_len);
   dump_last_modified(s, lastmod);
   dump_header(s, "X-Timestamp", utime_t(lastmod));
-
   if (is_slo) {
     dump_header(s, "X-Static-Large-Object", "True");
   }