From: Ved-vampir Date: Thu, 28 Apr 2016 15:35:47 +0000 (+0300) Subject: rgw: compression: add multipart and copy support X-Git-Tag: v11.1.0~429^2~35 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=89174520f6d7556de6580fef5a2f0093f0fd4a79;p=ceph.git rgw: compression: add multipart and copy support Signed-off-by: Alyona Kiseleva --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 437d5e5a35d0..def03bebe5ff 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1290,14 +1290,14 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) while (first_block <= last_block) { bufferlist tmp, tmp_out; int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs; - if (ofs_in_bl + cs_info.blocks[first_block].len > bl.length()) { + if (ofs_in_bl + cs_info.blocks[first_block].len > bl_len) { // not complete block, put it to waiting - int tail = bl.length() - ofs_in_bl; - bl.copy(ofs_in_bl, tail, waiting); + int tail = bl_len - ofs_in_bl; + in_bl.copy(ofs_in_bl, tail, waiting); cur_ofs -= tail; break; } - bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp); + in_bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp); int cr = compressor->decompress(tmp, tmp_out); if (cr < 0) { lderr(s->cct) << "Compression failed with exit code " << cr << dendl; @@ -1317,7 +1317,7 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) first_data = false; cur_ofs += bl_len; - return send_response_data(out_bl, bl_ofs, out_bl.length()); + return send_response_data(out_bl, bl_ofs, out_bl.length() - bl_ofs); } } // end of compression stuff @@ -1388,6 +1388,7 @@ void RGWGetObj::execute() perfcounter->inc(l_rgw_get); int64_t new_ofs, new_end; + int cret; RGWRados::Object op_target(store, s->bucket_info, *static_cast(s->obj_ctx), obj); RGWRados::Object::Read read_op(&op_target); @@ -1417,8 +1418,55 @@ void RGWGetObj::execute() read_op.params.perr = &s->err; op_ret = read_op.prepare(&new_ofs, &new_end); - if (op_ret < 0) + if (op_ret < 0 && op_ret != -ERANGE) // check erange error later + goto done_err; + + cret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info); + if (cret < 0) { + lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl; + if (op_ret == 0) + op_ret = cret; goto done_err; + } + + if (op_ret == -ERANGE && !need_decompress) + goto done_err; + + if (need_decompress) { + op_ret = 0; + s->obj_size = cs_info.orig_size; + + if (partial_content) { + // recheck user range for correctness + if (ofs < 0) { + ofs += cs_info.orig_size; + if (ofs < 0) + ofs = 0; + end = cs_info.orig_size - 1; + } else if (end < 0) { + end = cs_info.orig_size - 1; + } + + if (ofs >= cs_info.orig_size) { + lderr(s->cct) << "ERROR: begin of the bytes range more than object size (" << cs_info.orig_size + << ")" << dendl; + op_ret = -ERANGE; + goto done_err; + } else + new_ofs = ofs; + + if (end >= cs_info.orig_size) { + ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size + << ")" << dendl; + new_end = cs_info.orig_size - 1; + } else + new_end = end; + + total_len = new_end - new_ofs + 1; + + } else + total_len = cs_info.orig_size; + } // for range requests with obj size 0 if (range_str && !(s->obj_size)) { @@ -1471,16 +1519,6 @@ void RGWGetObj::execute() return; } - op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info); - if (op_ret < 0) { - lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl; - goto done_err; - } - if (need_decompress && !partial_content) { - total_len = cs_info.orig_size; - } - - /* Check whether the object has expired. Swift API documentation * stands that we should return 404 Not Found in such case. */ if (need_object_expiration() && object_is_expired(attrs)) { @@ -1494,7 +1532,15 @@ void RGWGetObj::execute() start = ofs; if (need_decompress) { + + if (cs_info.blocks.size() == 0) { + lderr(s->cct) << "ERROR: no info about compression blocks, cannot decompress" << dendl; + op_ret = -EIO; + goto done_err; + } + if (partial_content) { + // if user set range, we need to calculate it in decompressed data first_block = 0; last_block = 0; if (cs_info.blocks.size() > 1) { @@ -1508,27 +1554,15 @@ void RGWGetObj::execute() first_block = 0; last_block = cs_info.blocks.size() - 1; } - ofs = cs_info.blocks[first_block].new_ofs; - end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len; - - // check user range for correctness - if (new_ofs < 0) { - ldout(s->cct, 5) << "WARNING: uncorrect begin of the bytes range, get 0 instead" << dendl; - new_ofs = 0; - } - if (new_end >= cs_info.orig_size) { - ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size - << ")" << dendl; - new_end = cs_info.orig_size - 1; - } - q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs; q_len = new_end - cs_info.blocks[last_block].old_ofs + 1; + new_ofs = cs_info.blocks[first_block].new_ofs; + new_end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len; + first_data = true; - cur_ofs = ofs; + cur_ofs = new_ofs; waiting.clear(); - } /* STAT ops don't need data, and do no i/o */ @@ -1536,14 +1570,14 @@ void RGWGetObj::execute() return; } - if (!get_data || ofs > end) { + if (!get_data || new_ofs > new_end) { send_response_data(bl, 0, 0); return; } - perfcounter->inc(l_rgw_get_b, end - ofs); + perfcounter->inc(l_rgw_get_b, new_end - new_ofs); - op_ret = read_op.iterate(ofs, end, &cb); + op_ret = read_op.iterate(new_ofs, new_end, &cb); perfcounter->tinc(l_rgw_get_lat, (ceph_clock_now(s->cct) - start_time)); @@ -2761,6 +2795,12 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, real_time *mtime, re info.size = s->obj_size; info.modified = real_clock::now(); info.manifest = manifest; + if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) { + bool tmp; + RGWCompressionInfo cs_info; + rgw_compression_info_from_attrset(attrs, tmp, cs_info); + info.cs_info = cs_info; + } ::encode(info, bl); string multipart_meta_obj = mp.get_meta(); @@ -2957,6 +2997,7 @@ void RGWPutObj::execute() fst = copy_source_range_fst; lst = copy_source_range_lst; + processor->compression_enabled = s->cct->_conf->rgw_compression_type != "none"; do { bufferlist data_in; @@ -3079,7 +3120,6 @@ void RGWPutObj::execute() dout(10) << "v4 auth ok" << dendl; } - op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket, user_quota, bucket_quota, s->obj_size); if (op_ret < 0) { @@ -3228,6 +3268,8 @@ void RGWPostObj::execute() return; } + processor.compression_enabled = s->cct->_conf->rgw_compression_type != "none"; + while (data_pending) { bufferlist data; len = get_data(data); @@ -3240,6 +3282,7 @@ void RGWPostObj::execute() if (!len) break; + hash.Update((const byte *)data.c_str(), data.length()); op_ret = put_data_and_throttle(&processor, data, ofs, &hash, false); ofs += len; @@ -3263,7 +3306,6 @@ void RGWPostObj::execute() return; } - processor.complete_hash(&hash); hash.Final(m); buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); @@ -3280,6 +3322,16 @@ void RGWPostObj::execute() emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl)); } + if (processor.is_compressed()) { + bufferlist tmp; + RGWCompressionInfo cs_info; + cs_info.compression_type = s->cct->_conf->rgw_compression_type; + cs_info.orig_size = s->obj_size; + cs_info.blocks = processor.get_compression_blocks(); + ::encode(cs_info, tmp); + emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp)); + } + op_ret = processor.complete(etag, NULL, real_time(), attrs, delete_at); } @@ -4771,6 +4823,8 @@ void RGWCompleteMultipart::execute() int max_parts = 1000; int marker = 0; bool truncated; + RGWCompressionInfo cs_info; + bool compressed = false; uint64_t min_part_size = s->cct->_conf->rgw_multipart_min_part_size; @@ -4852,6 +4906,32 @@ void RGWCompleteMultipart::execute() manifest.append(obj_part.manifest); } + if (obj_part.cs_info.compression_type != "none") { + if (compressed && cs_info.compression_type != obj_part.cs_info.compression_type) { + ldout(s->cct, 0) << "ERROR: compression type was changed during multipart upload (" + << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl; + op_ret = -ERR_INVALID_PART; + return; + } + int new_ofs; // offset in compression data for new part + if (cs_info.blocks.size() > 0) + new_ofs = cs_info.blocks[cs_info.blocks.size() - 1].new_ofs + cs_info.blocks[cs_info.blocks.size() - 1].len; + else + new_ofs = 0; + for (off_t i=0; i < obj_part.cs_info.blocks.size(); ++i) { + compression_block cb; + cb.old_ofs = obj_part.cs_info.blocks[i].old_ofs + cs_info.orig_size; + cb.new_ofs = new_ofs; + cb.len = obj_part.cs_info.blocks[i].len; + cs_info.blocks.push_back(cb); + new_ofs = cb.new_ofs + cb.len; + } + if (!compressed) + cs_info.compression_type = obj_part.cs_info.compression_type; + cs_info.orig_size += obj_part.cs_info.orig_size; + compressed = true; + } + rgw_obj_key remove_key; src_obj.get_index_key(&remove_key); @@ -4872,6 +4952,13 @@ void RGWCompleteMultipart::execute() attrs[RGW_ATTR_ETAG] = etag_bl; + if (compressed) { + // write compression attribute to full object + bufferlist tmp; + ::encode(cs_info, tmp); + attrs[RGW_ATTR_COMPRESSION] = tmp; + } + target_obj.init(s->bucket, s->object.name); if (versioned_object) { store->gen_rand_obj_instance_name(&target_obj); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index cea6d3cfc8f8..75df21a363b4 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2370,7 +2370,6 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, bufferlist in_bl; // compression stuff - bool compression_enabled = store->ctx()->_conf->rgw_compression_type != "none"; if ((ofs > 0 && compressed) || // if previous part was compressed (ofs == 0 && compression_enabled)) { // or it's the first part and flag is set ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl; @@ -2414,7 +2413,6 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, } // end of compression stuff - if (extra_data_len) { size_t extra_len = in_bl.length(); if (extra_len > extra_data_len) @@ -7393,6 +7391,8 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx, attrs.erase(RGW_ATTR_ID_TAG); attrs.erase(RGW_ATTR_PG_VER); attrs.erase(RGW_ATTR_SOURCE_ZONE); + if (src_attrs.find(RGW_ATTR_COMPRESSION) != src_attrs.end()) + attrs[RGW_ATTR_COMPRESSION] = src_attrs[RGW_ATTR_COMPRESSION]; RGWObjManifest manifest; RGWObjState *astate = NULL; @@ -9140,9 +9140,11 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend) end = astate->size - 1; } + int ret = 0; + if (astate->size > 0) { if (ofs >= (off_t)astate->size) { - return -ERANGE; + ret = -ERANGE; } if (end >= (off_t)astate->size) { end = astate->size - 1; @@ -9160,7 +9162,7 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend) if (params.lastmod) *params.lastmod = astate->mtime; - return 0; + return ret; } int RGWRados::SystemObject::get_state(RGWObjState **pstate, RGWObjVersionTracker *objv_tracker) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 4c67b4a02b55..2ea19ce4addc 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -678,26 +678,30 @@ struct RGWUploadPartInfo { string etag; ceph::real_time modified; RGWObjManifest manifest; + RGWCompressionInfo cs_info; RGWUploadPartInfo() : num(0), size(0) {} void encode(bufferlist& bl) const { - ENCODE_START(3, 2, bl); + ENCODE_START(4, 2, bl); ::encode(num, bl); ::encode(size, bl); ::encode(etag, bl); ::encode(modified, bl); ::encode(manifest, bl); + ::encode(cs_info, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl); + DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl); ::decode(num, bl); ::decode(size, bl); ::decode(etag, bl); ::decode(modified, bl); if (struct_v >= 3) ::decode(manifest, bl); + if (struct_v >= 4) + ::decode(cs_info, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -3300,7 +3304,13 @@ protected: const char *if_match = NULL, const char *if_nomatch = NULL) = 0; public: - RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), compressed(false), bucket_info(_bi), canceled(false) {} + RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), + obj_ctx(_obj_ctx), + is_complete(false), + compressed(false), + bucket_info(_bi), + canceled(false), + compression_enabled(false) {} virtual ~RGWPutObjProcessor() {} virtual int prepare(RGWRados *_store, string *oid_rand) { store = _store; @@ -3315,6 +3325,7 @@ public: map& attrs, ceph::real_time delete_at, const char *if_match = NULL, const char *if_nomatch = NULL); + bool compression_enabled; CephContext *ctx(); bool is_canceled() { return canceled; } diff --git a/src/rgw/rgw_rest_swift.cc b/src/rgw/rgw_rest_swift.cc index d19ff7e01ede..b482a2ace500 100644 --- a/src/rgw/rgw_rest_swift.cc +++ b/src/rgw/rgw_rest_swift.cc @@ -1240,7 +1240,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, const off_t bl_ofs, const off_t bl_len) { - string content_type; + string content_type; int rr; if (sent_header) { goto send_data; @@ -1272,7 +1272,6 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, dump_content_length(s, total_len); dump_last_modified(s, lastmod); dump_header(s, "X-Timestamp", utime_t(lastmod)); - if (is_slo) { dump_header(s, "X-Static-Large-Object", "True"); }