From 8a1ce0fb8ec8bca18b68af34a81772d0aebf45e7 Mon Sep 17 00:00:00 2001 From: Ved-vampir Date: Wed, 13 Apr 2016 14:35:27 +0300 Subject: [PATCH] rgw: add compression support for ranged queries Signed-off-by: Alyona Kiseleva --- src/rgw/rgw_op.cc | 89 ++++++++++++++++++++++++++++++++++++++++---- src/rgw/rgw_op.h | 12 ++++++ src/rgw/rgw_rados.cc | 26 +++++++++++-- src/rgw/rgw_rados.h | 28 ++++++++++++++ src/vstart.sh | 6 +++ 5 files changed, 151 insertions(+), 10 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index e5f2366dad75f..437d5e5a35d07 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1267,7 +1267,8 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) } // compression stuff if (need_decompress) { - ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part" << dendl; + ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl; + CompressorRef compressor = Compressor::create(s->cct, cs_info.compression_type); if (!compressor.get()) { // if compressor isn't available - error, because cannot return decompressed data? @@ -1275,12 +1276,47 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) << "for rgw, check rgw_compression_type config option" << dendl; return -EIO; } else { - bufferlist out_bl; - int cr = compressor->decompress(bl, out_bl); - if (cr < 0) { - lderr(s->cct) << "Compression failed with exit code " << cr << dendl; - return cr; + bufferlist out_bl, in_bl; + bl_ofs = 0; + if (waiting.length() != 0) { + in_bl.append(waiting); + in_bl.append(bl); + waiting.clear(); + } else { + in_bl.claim(bl); + } + bl_len = in_bl.length(); + + while (first_block <= last_block) { + bufferlist tmp, tmp_out; + int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs; + if (ofs_in_bl + cs_info.blocks[first_block].len > bl.length()) { + // not complete block, put it to waiting + int tail = bl.length() - ofs_in_bl; + bl.copy(ofs_in_bl, tail, waiting); + cur_ofs -= tail; + break; + } + bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp); + int cr = compressor->decompress(tmp, tmp_out); + if (cr < 0) { + lderr(s->cct) << "Compression failed with exit code " << cr << dendl; + return cr; + } + if (first_block == last_block && partial_content) + out_bl.append(tmp_out.c_str(), q_len); + else + out_bl.append(tmp_out); + first_block++; } + + if (first_data && partial_content && out_bl.length() != 0) + bl_ofs = q_ofs; + + if (first_data && out_bl.length() != 0) + first_data = false; + + cur_ofs += bl_len; return send_response_data(out_bl, bl_ofs, out_bl.length()); } } @@ -1440,7 +1476,7 @@ void RGWGetObj::execute() lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl; goto done_err; } - if (need_decompress) { + if (need_decompress && !partial_content) { total_len = cs_info.orig_size; } @@ -1457,6 +1493,44 @@ void RGWGetObj::execute() start = ofs; + if (need_decompress) { + if (partial_content) { + // if user set range, we need to calculate it in decompressed data + first_block = 0; last_block = 0; + if (cs_info.blocks.size() > 1) { + off_t i = 1; + while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs <= new_ofs) i++; + first_block = i - 1; + while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs < new_end) i++; + last_block = i - 1; + } + } else { + first_block = 0; last_block = cs_info.blocks.size() - 1; + } + + ofs = cs_info.blocks[first_block].new_ofs; + end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len; + + // check user range for correctness + if (new_ofs < 0) { + ldout(s->cct, 5) << "WARNING: uncorrect begin of the bytes range, get 0 instead" << dendl; + new_ofs = 0; + } + if (new_end >= cs_info.orig_size) { + ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size + << ")" << dendl; + new_end = cs_info.orig_size - 1; + } + + q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs; + q_len = new_end - cs_info.blocks[last_block].old_ofs + 1; + + first_data = true; + cur_ofs = ofs; + waiting.clear(); + + } + /* STAT ops don't need data, and do no i/o */ if (get_type() == RGW_OP_STAT_OBJ) { return; @@ -3020,6 +3094,7 @@ void RGWPutObj::execute() RGWCompressionInfo cs_info; cs_info.compression_type = s->cct->_conf->rgw_compression_type; cs_info.orig_size = s->obj_size; + cs_info.blocks = processor->get_compression_blocks(); ::encode(cs_info, tmp); attrs[RGW_ATTR_COMPRESSION] = tmp; } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 9e9eef14873ce..2431b7cf58503 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -138,6 +138,11 @@ protected: // compression attrs RGWCompressionInfo cs_info; bool need_decompress; + off_t first_block, last_block; + off_t q_ofs, q_len; + bool first_data; + uint64_t cur_ofs; + bufferlist waiting; int init_common(); public: @@ -161,6 +166,13 @@ public: skip_manifest = false; is_slo = false; rgwx_stat = false; + need_decompress = false; + first_block = 0; + last_block = 0; + q_ofs = 0; + q_len = 0; + first_data = true; + cur_ofs = 0; } bool prefetch_data(); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index abe23089de158..cea6d3cfc8f8d 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2373,9 +2373,14 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, bool compression_enabled = store->ctx()->_conf->rgw_compression_type != "none"; if ((ofs > 0 && compressed) || // if previous part was compressed (ofs == 0 && compression_enabled)) { // or it's the first part and flag is set - ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part" << dendl; + ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl; CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type); if (!compressor.get()) { + if (ofs > 0 && compressed) { + lderr(store->ctx()) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type + << " for next part, compression process failed" << dendl; + return -EIO; + } // if compressor isn't available - just do not use it with log warning? ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type << "for rgw, check rgw_compression_type config option" << dendl; @@ -2383,12 +2388,24 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, in_bl.claim(bl); } else { int cr = compressor->compress(bl, in_bl); - if (cr != 0) { + if (cr < 0) { + if (ofs > 0 && compressed) { + lderr(store->ctx()) << "Compression failed with exit code " << cr + << " for next part, compression process failed" << dendl; + return -EIO; + } ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl; compressed = false; in_bl.claim(bl); } else { compressed = true; + + compression_block newbl; + int bs = blocks.size(); + newbl.old_ofs = ofs; + newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; + newbl.len = in_bl.length(); + blocks.push_back(newbl); } } } else { @@ -12893,7 +12910,10 @@ int rgw_compression_info_from_attrset(map& attrs, bool& need } catch (buffer::error& err) { return -EIO; } - need_decompress = true; + if (cs_info.compression_type != "none") + need_decompress = true; + else + need_decompress = false; return 0; } else { need_decompress = false; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 6b42c97e4f42a..4c67b4a02b555 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -76,9 +76,33 @@ static inline void get_obj_bucket_and_oid_loc(const rgw_obj& obj, rgw_bucket& bu int rgw_policy_from_attrset(CephContext *cct, map& attrset, RGWAccessControlPolicy *policy); +struct compression_block { + uint64_t old_ofs; + uint64_t new_ofs; + uint64_t len; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(old_ofs, bl); + ::encode(new_ofs, bl); + ::encode(len, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(old_ofs, bl); + ::decode(new_ofs, bl); + ::decode(len, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(compression_block) + struct RGWCompressionInfo { string compression_type; uint64_t orig_size; + vector blocks; RGWCompressionInfo() : compression_type("none"), orig_size(0) {} @@ -86,6 +110,7 @@ struct RGWCompressionInfo { ENCODE_START(1, 1, bl); ::encode(compression_type, bl); ::encode(orig_size, bl); + ::encode(blocks, bl); ENCODE_FINISH(bl); } @@ -93,6 +118,7 @@ struct RGWCompressionInfo { DECODE_START(1, bl); ::decode(compression_type, bl); ::decode(orig_size, bl); + ::decode(blocks, bl); DECODE_FINISH(bl); } }; @@ -3267,6 +3293,7 @@ protected: bool compressed; RGWBucketInfo bucket_info; bool canceled; + vector blocks; virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime, map& attrs, ceph::real_time delete_at, @@ -3292,6 +3319,7 @@ public: bool is_canceled() { return canceled; } bool is_compressed() { return compressed; } + const vector& get_compression_blocks() { return blocks; } }; /* RGWPutObjProcessor */ struct put_obj_aio_info { diff --git a/src/vstart.sh b/src/vstart.sh index cf322b9eee5ca..a829809cf75fe 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -16,6 +16,12 @@ if [ -n "$VSTART_DEST" ]; then CEPH_OUT_DIR=$VSTART_DEST/out fi +mkdir -p .libs/compressor +for f in `ls -d compressor/*/`; +do + cp .libs/libceph_`basename $f`.so* .libs/compressor/; +done + # for running out of the CMake build directory if [ -e CMakeCache.txt ]; then # Out of tree build, learn source location from CMakeCache.txt -- 2.39.5