From cd0cc75a8634afe0e1b3383cc0c514e34e4952fc Mon Sep 17 00:00:00 2001 From: Ved-vampir Date: Thu, 30 Jun 2016 15:55:16 +0300 Subject: [PATCH] rgw: move compression code in filters system Signed-off-by: Alyona Kiseleva --- src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_compression.cc | 173 +++++++++++++++++++++ src/rgw/rgw_compression.h | 51 +++++++ src/rgw/rgw_op.cc | 297 +++++++++++++++++-------------------- src/rgw/rgw_op.h | 42 +++--- src/rgw/rgw_rados.cc | 96 +++++------- src/rgw/rgw_rados.h | 35 +++-- src/rgw/rgw_rest_swift.cc | 2 +- src/spdk | 2 +- src/vstart.sh | 6 - src/xxHash | 2 +- 11 files changed, 440 insertions(+), 267 deletions(-) create mode 100644 src/rgw/rgw_compression.cc create mode 100644 src/rgw/rgw_compression.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 20984a0adc9b4..bc70a83dffb53 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -29,6 +29,7 @@ set(rgw_a_srcs rgw_cache.cc rgw_client_io.cc rgw_common.cc + rgw_compression.cc rgw_cors.cc rgw_cors_s3.cc rgw_dencoder.cc diff --git a/src/rgw/rgw_compression.cc b/src/rgw/rgw_compression.cc new file mode 100644 index 0000000000000..b58c0cd94e64a --- /dev/null +++ b/src/rgw/rgw_compression.cc @@ -0,0 +1,173 @@ + +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw_compression.h" + +#define dout_subsys ceph_subsys_rgw + +//------------RGWPutObj_Compress--------------- + +int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) +{ + bufferlist in_bl; + + if (bl.length() > 0) { + // compression stuff + if ((ofs > 0 && compressed) || // if previous part was compressed + (ofs == 0)) { // or it's the first part + ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl; + CompressorRef compressor = Compressor::create(cct, cct->_conf->rgw_compression_type); + if (!compressor.get()) { + if (ofs > 0 && compressed) { + lderr(cct) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type + << " for next part, compression process failed" << dendl; + return -EIO; + } + // if compressor isn't available - just do not use it with log warning? + ldout(cct, 5) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type + << " for rgw, check rgw_compression_type config option" << dendl; + compressed = false; + in_bl.claim(bl); + } else { + int cr = compressor->compress(bl, in_bl); + if (cr < 0) { + if (ofs > 0 && compressed) { + lderr(cct) << "Compression failed with exit code " << cr + << " for next part, compression process failed" << dendl; + return -EIO; + } + ldout(cct, 5) << "Compression failed with exit code " << cr << dendl; + compressed = false; + in_bl.claim(bl); + } else { + compressed = true; + + compression_block newbl; + int bs = blocks.size(); + newbl.old_ofs = ofs; + newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; + newbl.len = in_bl.length(); + blocks.push_back(newbl); + } + } + } else { + compressed = false; + in_bl.claim(bl); + } + // end of compression stuff + } + + return next->handle_data(in_bl, ofs, phandle, pobj, again); +} + +//----------------RGWGetObj_Decompress--------------------- +RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, + RGWCompressionInfo* cs_info_, + bool partial_content_, + RGWGetDataCB* next): RGWGetObj_Filter(next), + cct(cct_), + cs_info(cs_info_), + partial_content(partial_content_), + q_ofs(0), + q_len(0), + first_data(true), + cur_ofs(0) +{ + compressor = Compressor::create(cct, cs_info->compression_type); + if (!compressor.get()) + lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type + << " for rgw, check rgw_compression_type config option" << dendl; +} + +int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) +{ + ldout(cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl; + + if (!compressor.get()) { + // if compressor isn't available - error, because cannot return decompressed data? + lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type + << " for rgw, check rgw_compression_type config option" << dendl; + return -EIO; + } + bufferlist out_bl, in_bl; + bl_ofs = 0; + if (waiting.length() != 0) { + in_bl.append(waiting); + in_bl.append(bl); + waiting.clear(); + } else { + in_bl.claim(bl); + } + bl_len = in_bl.length(); + + while (first_block <= last_block) { + bufferlist tmp, tmp_out; + int ofs_in_bl = first_block->new_ofs - cur_ofs; + if (ofs_in_bl + (unsigned)first_block->len > bl_len) { + // not complete block, put it to waiting + int tail = bl_len - ofs_in_bl; + in_bl.copy(ofs_in_bl, tail, waiting); + cur_ofs -= tail; + break; + } + in_bl.copy(ofs_in_bl, first_block->len, tmp); + int cr = compressor->decompress(tmp, tmp_out); + if (cr < 0) { + lderr(cct) << "Compression failed with exit code " << cr << dendl; + return cr; + } + if (first_block == last_block && partial_content) + tmp_out.copy(0, q_len, out_bl); + else + out_bl.append(tmp_out); + first_block++; + } + + if (first_data && partial_content && out_bl.length() != 0) + bl_ofs = q_ofs; + + if (first_data && out_bl.length() != 0) + first_data = false; + + cur_ofs += bl_len; + return next->handle_data(out_bl, bl_ofs, out_bl.length() - bl_ofs); +} + +void RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end) +{ + if (partial_content) { + // if user set range, we need to calculate it in decompressed data + first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin(); + if (cs_info->blocks.size() > 1) { + vector::iterator fb, lb; + // not bad to use auto for lambda, I think + auto cmp_u = [] (off_t ofs, const compression_block& e) { return (unsigned)ofs < e.old_ofs; }; + auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs < (unsigned)ofs; }; + fb = upper_bound(cs_info->blocks.begin()+1, + cs_info->blocks.end(), + ofs, + cmp_u); + first_block = fb - 1; + lb = lower_bound(fb, + cs_info->blocks.end(), + end, + cmp_l); + last_block = lb - 1; + } + } else { + first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1; + } + + q_ofs = ofs - first_block->old_ofs; + q_len = end - last_block->old_ofs + 1; + + ofs = first_block->new_ofs; + end = last_block->new_ofs + last_block->len; + + first_data = true; + cur_ofs = ofs; + waiting.clear(); + + next->fixup_range(ofs, end); +} diff --git a/src/rgw/rgw_compression.h b/src/rgw/rgw_compression.h new file mode 100644 index 0000000000000..41e4bf8f7b2be --- /dev/null +++ b/src/rgw/rgw_compression.h @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RGW_COMPRESSION_H +#define CEPH_RGW_COMPRESSION_H + +#include + +#include "compressor/Compressor.h" +#include "rgw_op.h" + +class RGWGetObj_Decompress : public RGWGetObj_Filter +{ + CephContext* cct; + CompressorRef compressor; + RGWCompressionInfo* cs_info; + bool partial_content; + vector::iterator first_block, last_block; + off_t q_ofs, q_len; + bool first_data; + uint64_t cur_ofs; + bufferlist waiting; +public: + RGWGetObj_Decompress(CephContext* cct_, + RGWCompressionInfo* cs_info_, + bool partial_content_, + RGWGetDataCB* next); + virtual ~RGWGetObj_Decompress() {} + + virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; + virtual void fixup_range(off_t& ofs, off_t& end) override; + +}; + +class RGWPutObj_Compress : public RGWPutObj_Filter +{ + CephContext* cct; + bool compressed; + std::vector blocks; +public: + RGWPutObj_Compress(CephContext* cct_, RGWPutObjDataProcessor* next) : RGWPutObj_Filter(next), + cct(cct_) {} + virtual ~RGWPutObj_Compress(){} + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override; + + bool is_compressed() { return compressed; } + vector& get_compression_blocks() { return blocks; } + +}; /* RGWPutObj_Compress */ + +#endif /* CEPH_RGW_COMPRESSION_H */ diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index da434a95c2522..70b3626d63b98 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -8,6 +8,8 @@ #include #include +#include +#include #include "common/Clock.h" #include "common/armor.h" @@ -30,9 +32,8 @@ #include "rgw_cors_s3.h" #include "rgw_rest_conn.h" #include "rgw_rest_s3.h" -#include "rgw_lc.h" -#include "rgw_lc_s3.h" #include "rgw_client_io.h" +#include "rgw_compression.h" #include "cls/lock/cls_lock_client.h" #include "cls/rgw/cls_rgw_client.h" @@ -774,6 +775,16 @@ bool RGWOp::generate_cors_headers(string& origin, string& method, string& header return true; } +int RGWOp::update_compressed_bucket_size(uint64_t obj_size, RGWBucketCompressionInfo& bucket_size) +{ + bucket_size.orig_size += obj_size; + bufferlist bs; + ::encode(bucket_size, bs); + s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs; + return store->put_bucket_instance_info(s->bucket_info, false, real_time(), + &s->bucket_attrs); +} + int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, const RGWObjEnt& ent, RGWAccessControlPolicy * const bucket_policy, @@ -1265,62 +1276,6 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) gc_invalidate_time = start_time; gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2); } - // compression stuff - if (need_decompress) { - ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl; - - CompressorRef compressor = Compressor::create(s->cct, cs_info.compression_type); - if (!compressor.get()) { - // if compressor isn't available - error, because cannot return decompressed data? - lderr(s->cct) << "Cannot load compressor of type " << cs_info.compression_type - << "for rgw, check rgw_compression_type config option" << dendl; - return -EIO; - } else { - bufferlist out_bl, in_bl; - bl_ofs = 0; - if (waiting.length() != 0) { - in_bl.append(waiting); - in_bl.append(bl); - waiting.clear(); - } else { - in_bl.claim(bl); - } - bl_len = in_bl.length(); - - while (first_block <= last_block) { - bufferlist tmp, tmp_out; - int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs; - if (ofs_in_bl + cs_info.blocks[first_block].len > bl_len) { - // not complete block, put it to waiting - int tail = bl_len - ofs_in_bl; - in_bl.copy(ofs_in_bl, tail, waiting); - cur_ofs -= tail; - break; - } - in_bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp); - int cr = compressor->decompress(tmp, tmp_out); - if (cr < 0) { - lderr(s->cct) << "Compression failed with exit code " << cr << dendl; - return cr; - } - if (first_block == last_block && partial_content) - out_bl.append(tmp_out.c_str(), q_len); - else - out_bl.append(tmp_out); - first_block++; - } - - if (first_data && partial_content && out_bl.length() != 0) - bl_ofs = q_ofs; - - if (first_data && out_bl.length() != 0) - first_data = false; - - cur_ofs += bl_len; - return send_response_data(out_bl, bl_ofs, out_bl.length() - bl_ofs); - } - } - // end of compression stuff return send_response_data(bl, bl_ofs, bl_len); } @@ -1383,7 +1338,8 @@ void RGWGetObj::execute() gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2); RGWGetObj_CB cb(this); - RGWGetDataCB* decrypt = nullptr; + RGWGetDataCB* filter = (RGWGetDataCB*)&cb; + boost::optional decompress; map::iterator attr_iter; perfcounter->inc(l_rgw_get); @@ -1447,7 +1403,7 @@ void RGWGetObj::execute() end = cs_info.orig_size - 1; } - if (ofs >= cs_info.orig_size) { + if ((unsigned)ofs >= cs_info.orig_size) { lderr(s->cct) << "ERROR: begin of the bytes range more than object size (" << cs_info.orig_size << ")" << dendl; op_ret = -ERANGE; @@ -1455,7 +1411,7 @@ void RGWGetObj::execute() } else new_ofs = ofs; - if (end >= cs_info.orig_size) { + if ((unsigned)end >= cs_info.orig_size) { ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size << ")" << dendl; new_end = cs_info.orig_size - 1; @@ -1539,30 +1495,8 @@ void RGWGetObj::execute() goto done_err; } - if (partial_content) { - - // if user set range, we need to calculate it in decompressed data - first_block = 0; last_block = 0; - if (cs_info.blocks.size() > 1) { - off_t i = 1; - while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs <= new_ofs) i++; - first_block = i - 1; - while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs < new_end) i++; - last_block = i - 1; - } - } else { - first_block = 0; last_block = cs_info.blocks.size() - 1; - } - - q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs; - q_len = new_end - cs_info.blocks[last_block].old_ofs + 1; - - new_ofs = cs_info.blocks[first_block].new_ofs; - new_end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len; - - first_data = true; - cur_ofs = new_ofs; - waiting.clear(); + decompress = boost::in_place(s->cct, &cs_info, partial_content, filter); + filter = &*decompress; } /* STAT ops don't need data, and do no i/o */ @@ -1577,21 +1511,10 @@ void RGWGetObj::execute() perfcounter->inc(l_rgw_get_b, new_end - new_ofs); - op_ret = this->get_decrypt_filter(&decrypt, cb); - if (op_ret < 0) { - goto done_err; - } - if (decrypt != nullptr) { - off_t tmp_ofs = ofs; - off_t tmp_end = end; - decrypt->fixup_range(tmp_ofs, tmp_end); - op_ret = read_op.iterate(tmp_ofs, tmp_end, decrypt); - if (op_ret >= 0) - op_ret = decrypt->flush(); - delete decrypt; - } - else - op_ret = read_op.iterate(ofs, end, &cb); + filter->fixup_range(new_ofs, new_end); + op_ret = read_op.iterate(new_ofs, new_end, filter); + if (op_ret >= 0) + op_ret = filter->flush(); perfcounter->tinc(l_rgw_get_lat, (ceph_clock_now(s->cct) - start_time)); @@ -2006,9 +1929,12 @@ void RGWStatBucket::execute() op_ret = -EINVAL; } } - if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) { - ::decode(bucket.size, s->bucket_attrs[RGW_ATTR_COMPRESSION]); - } + RGWBucketCompressionInfo bcs; + int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bcs); + if (!res) + bucket.size = bcs.orig_size; + if (res < 0) + lderr(s->cct) << "Failed to read decompressed bucket size" << dendl; } int RGWListBucket::verify_permission() @@ -2812,12 +2738,14 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, real_time *mtime, re info.size = s->obj_size; info.modified = real_clock::now(); info.manifest = manifest; - if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) { - bool tmp; - RGWCompressionInfo cs_info; - rgw_compression_info_from_attrset(attrs, tmp, cs_info); - info.cs_info = cs_info; + + bool compressed; + r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info); + if (r < 0) { + dout(1) << "cannot get compression info" << dendl; + return r; } + ::encode(info, bl); string multipart_meta_obj = mp.get_meta(); @@ -2854,7 +2782,7 @@ RGWPutObjProcessor *RGWPutObj::select_processor(RGWObjectCtx& obj_ctx, bool *is_ return processor; } -void RGWPutObj::dispose_processor(RGWPutObjProcessor *processor) +void RGWPutObj::dispose_processor(RGWPutObjDataProcessor *processor) { delete processor; } @@ -2920,6 +2848,7 @@ int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl) void RGWPutObj::execute() { RGWPutObjProcessor *processor = NULL; + RGWPutObjDataProcessor *filter = NULL; char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1]; char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; @@ -2933,6 +2862,10 @@ void RGWPutObj::execute() off_t fst; off_t lst; uint64_t bucket_size = 0; + RGWBucketCompressionInfo bucket_size; + int res; + bool compression_enabled; + boost::optional compressor; bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL); @@ -2993,6 +2926,9 @@ void RGWPutObj::execute() processor = select_processor(*static_cast(s->obj_ctx), &multipart); + // no filters by default + filter = processor; + /* Handle object versioning of Swift API. */ if (! multipart) { rgw_obj obj(s->bucket, s->object); @@ -3014,7 +2950,11 @@ void RGWPutObj::execute() fst = copy_source_range_fst; lst = copy_source_range_lst; - processor->compression_enabled = s->cct->_conf->rgw_compression_type != "none"; + compression_enabled = s->cct->_conf->rgw_compression_type != "none"; + if (compression_enabled) { + compressor = boost::in_place(s->cct, filter); + filter = &*compressor; + } do { bufferlist data_in; @@ -3061,7 +3001,7 @@ void RGWPutObj::execute() orig_data = data; } - op_ret = put_data_and_throttle(processor, data, ofs, need_to_wait); + op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait); if (op_ret < 0) { if (!need_to_wait || op_ret != -EEXIST) { ldout(s->cct, 20) << "processor->thottle_data() returned ret=" @@ -3079,6 +3019,8 @@ void RGWPutObj::execute() dispose_processor(processor); processor = select_processor(*static_cast(s->obj_ctx), &multipart); + filter = processor; + string oid_rand; char buf[33]; gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); @@ -3091,7 +3033,12 @@ void RGWPutObj::execute() goto done; } - op_ret = put_data_and_throttle(processor, data, ofs, false); + if (compression_enabled) { + compressor = boost::in_place(s->cct, filter); + filter = &*compressor; + } + + op_ret = put_data_and_throttle(filter, data, ofs, false); if (op_ret < 0) { goto done; } @@ -3143,24 +3090,21 @@ void RGWPutObj::execute() hash.Final(m); - if (processor->is_compressed()) { + if (compression_enabled && compressor->is_compressed()) { bufferlist tmp; RGWCompressionInfo cs_info; cs_info.compression_type = s->cct->_conf->rgw_compression_type; cs_info.orig_size = s->obj_size; - cs_info.blocks = processor->get_compression_blocks(); + cs_info.blocks = move(compressor->get_compression_blocks()); ::encode(cs_info, tmp); attrs[RGW_ATTR_COMPRESSION] = tmp; } - // add attr to bucket to know original size of data - if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) - ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]); - bucket_size += s->obj_size; - ::encode(bucket_size, bs); - s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs; - op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), - &s->bucket_attrs); + // add attr to bucket to know original size of data + res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size); + if (res < 0) + lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl; + op_ret = update_compressed_bucket_size(s->obj_size, bucket_size); if (op_ret < 0) ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl; @@ -3244,6 +3188,22 @@ int RGWPostObj::verify_permission() return 0; } +RGWPutObjProcessor *RGWPostObj::select_processor(RGWObjectCtx& obj_ctx) +{ + RGWPutObjProcessor *processor; + + uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size; + + processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled()); + + return processor; +} + +void RGWPostObj::dispose_processor(RGWPutObjDataProcessor *processor) +{ + delete processor; +} + void RGWPostObj::pre_exec() { rgw_bucket_object_pre_exec(s); @@ -3251,11 +3211,14 @@ void RGWPostObj::pre_exec() void RGWPostObj::execute() { + RGWPutObjDataProcessor *filter = NULL; char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; MD5 hash; buffer::list bl, aclbl; int len = 0; + bool compression_enabled; + boost::optional compressor; // read in the data from the POST form op_ret = get_params(); @@ -3288,12 +3251,18 @@ void RGWPostObj::execute() s->req_id, s->bucket_info.versioning_enabled()); + // no filters by default + filter = &processor; + op_ret = processor.prepare(store, nullptr); - if (op_ret < 0) { + if (op_ret < 0) return; - } - processor.compression_enabled = s->cct->_conf->rgw_compression_type != "none"; + compression_enabled = s->cct->_conf->rgw_compression_type != "none"; + if (compression_enabled) { + compressor = boost::in_place(s->cct, filter); + filter = &*compressor; + } while (data_pending) { bufferlist data; @@ -3308,7 +3277,7 @@ void RGWPostObj::execute() break; hash.Update((const byte *)data.c_str(), data.length()); - op_ret = put_data_and_throttle(&processor, data, ofs, &hash, false); + op_ret = put_data_and_throttle(filter, data, ofs, false); ofs += len; @@ -3347,12 +3316,12 @@ void RGWPostObj::execute() emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl)); } - if (processor.is_compressed()) { + if (compression_enabled && compressor->is_compressed()) { bufferlist tmp; RGWCompressionInfo cs_info; cs_info.compression_type = s->cct->_conf->rgw_compression_type; cs_info.orig_size = s->obj_size; - cs_info.blocks = processor.get_compression_blocks(); + cs_info.blocks = move(compressor->get_compression_blocks()); ::encode(cs_info, tmp); emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp)); } @@ -3760,22 +3729,26 @@ void RGWDeleteObj::execute() if (op_ret >= 0) { delete_marker = del_op.result.delete_marker; version_id = del_op.result.version_id; - if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) { - uint64_t bucket_size, deleted_size; - ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]); - if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) { - bool tmp; - RGWCompressionInfo cs_info; - rgw_compression_info_from_attrset(attrs, tmp, cs_info); - deleted_size = cs_info.orig_size; - } else + RGWBucketCompressionInfo bucket_size; + int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size); + if (res < 0) + lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl; + if (!res) { + uint64_t deleted_size; + bool tmp; + RGWCompressionInfo cs_info; + res = rgw_compression_info_from_attrset(attrs, tmp, cs_info); + if (res < 0) { + lderr(s->cct) << "ERROR: failed to decode compression info, cannot update stat" << dendl; deleted_size = s->obj_size; - bucket_size -= deleted_size; - bufferlist bs; - ::encode(bucket_size, bs); - s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs; - op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), - &s->bucket_attrs); + } else { + if (tmp) + deleted_size = cs_info.orig_size; + else + deleted_size = s->obj_size; + } + if (update_compressed_bucket_size(-deleted_size, bucket_size) < 0) + lderr(s->cct) << "ERROR: failed to update bucket stat" << dendl; } } @@ -4957,14 +4930,14 @@ void RGWCompleteMultipart::execute() } int new_ofs; // offset in compression data for new part if (cs_info.blocks.size() > 0) - new_ofs = cs_info.blocks[cs_info.blocks.size() - 1].new_ofs + cs_info.blocks[cs_info.blocks.size() - 1].len; + new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len; else new_ofs = 0; - for (off_t i=0; i < obj_part.cs_info.blocks.size(); ++i) { + for (const auto& block : obj_part.cs_info.blocks) { compression_block cb; - cb.old_ofs = obj_part.cs_info.blocks[i].old_ofs + cs_info.orig_size; + cb.old_ofs = block.old_ofs + cs_info.orig_size; cb.new_ofs = new_ofs; - cb.len = obj_part.cs_info.blocks[i].len; + cb.len = block.len; cs_info.blocks.push_back(cb); new_ofs = cb.new_ofs + cb.len; } @@ -5112,18 +5085,20 @@ void RGWAbortMultipart::execute() } map attrset; int y = get_obj_attrs(store, s, obj, attrset); - if (!y && attrset.find(RGW_ATTR_COMPRESSION) != attrset.end()) { + map::iterator cmp = attrset.find(RGW_ATTR_COMPRESSION); + if (!y && cmp != attrset.end()) { RGWCompressionInfo cs_info; - bufferlist::iterator bliter = attrset[RGW_ATTR_COMPRESSION].begin(); + bufferlist::iterator bliter = cmp->second.begin(); try { ::decode(cs_info, bliter); + if (cs_info.compression_type != "none") + deleted_size += cs_info.orig_size; + else + deleted_size += obj_part.size; } catch (buffer::error& err) { ldout(s->cct, 5) << "Failed to get decompressed obj size" << dendl; - } - if (cs_info.compression_type != "none") - deleted_size += cs_info.orig_size; - else deleted_size += obj_part.size; + } } else deleted_size += obj_part.size; } @@ -5152,15 +5127,13 @@ void RGWAbortMultipart::execute() } if (!op_ret) { - if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) { - uint64_t bucket_size; - ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]); - bucket_size -= deleted_size; - bufferlist bs; - ::encode(bucket_size, bs); - s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs; - op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), - &s->bucket_attrs); + RGWBucketCompressionInfo bucket_size; + int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size); + if (res < 0) + lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl; + if (!res) { + if (update_compressed_bucket_size(-deleted_size, bucket_size) < 0) + lderr(s->cct) << "ERROR: failed to update bucket stat" << dendl; } } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 01e21d5f9752e..6d55692077953 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -19,6 +19,7 @@ #include #include +#include #include "common/armor.h" #include "common/mime.h" @@ -86,6 +87,8 @@ RGWOp() : s(nullptr), dialect_handler(nullptr), store(nullptr), } int read_bucket_cors(); bool generate_cors_headers(string& origin, string& method, string& headers, string& exp_headers, unsigned *max_age); + // obj_size can be positive or negative + int update_compressed_bucket_size(uint64_t obj_size, RGWBucketCompressionInfo& bucket_size); virtual int verify_params() { return 0; } virtual bool prefetch_data() { return false; } @@ -201,13 +204,6 @@ public: virtual RGWOpType get_type() { return RGW_OP_GET_OBJ; } virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; } virtual bool need_object_expiration() { return false; } - /** - * calculates filter used to decrypt RGW objects data - */ - virtual int get_decrypt_filter(RGWGetDataCB** filter, RGWGetDataCB& cb) { - *filter = NULL; - return 0; - } }; class RGWGetObj_CB : public RGWGetDataCB @@ -225,30 +221,30 @@ public: class RGWGetObj_Filter : public RGWGetDataCB { protected: - RGWGetDataCB& next; + RGWGetDataCB* next; public: - RGWGetObj_Filter(RGWGetDataCB& next): next(next) {} + RGWGetObj_Filter(RGWGetDataCB* next): next(next) {} virtual ~RGWGetObj_Filter() {} /** * Passes data through filter. * Filter can modify content of bl. * When bl_len == 0 , it means 'flush */ - virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { - return next.handle_data(bl, bl_ofs, bl_len); + virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { + return next->handle_data(bl, bl_ofs, bl_len); } /** * Flushes any cached data. Used by RGWGetObjFilter. * Return logic same as handle_data. */ - virtual int flush() { - return next.flush(); + virtual int flush() override { + return next->flush(); } /** * Allows filter to extend range required for successful filtering */ - virtual void fixup_range(off_t& bl_ofs, off_t& bl_len) { - next.fixup_range(bl_ofs, bl_len); + virtual void fixup_range(off_t& ofs, off_t& end) override { + next->fixup_range(ofs, end); } }; @@ -759,7 +755,7 @@ public: } virtual RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart); - void dispose_processor(RGWPutObjProcessor *processor); + void dispose_processor(RGWPutObjDataProcessor *processor); int verify_permission(); void pre_exec(); @@ -779,16 +775,16 @@ public: class RGWPutObj_Filter : public RGWPutObjDataProcessor { protected: - RGWPutObjDataProcessor& next; + RGWPutObjDataProcessor* next; public: - RGWPutObj_Filter(RGWPutObjDataProcessor& next) : + RGWPutObj_Filter(RGWPutObjDataProcessor* next) : next(next){} - virtual ~RGWPutObj_Filter(){} - virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) { - return next.handle_data(bl, ofs, phandle, again); + virtual ~RGWPutObj_Filter() {} + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override { + return next->handle_data(bl, ofs, phandle, pobj, again); } - virtual int throttle_data(void *handle, bool need_to_wait) { - return next.throttle_data(handle, need_to_wait); + virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) override { + return next->throttle_data(handle, obj, need_to_wait); } }; /* RGWPutObj_Filter */ diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index ecdf104ea4fcf..1b46ddfc45f02 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -9,6 +9,7 @@ #include #include +#include #include "common/ceph_json.h" #include "common/utf8.h" @@ -2361,86 +2362,40 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); } -int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) +int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) { *again = false; *phandle = NULL; - bufferlist in_bl; - - // compression stuff - if ((ofs > 0 && compressed) || // if previous part was compressed - (ofs == 0 && compression_enabled)) { // or it's the first part and flag is set - ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl; - CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type); - if (!compressor.get()) { - if (ofs > 0 && compressed) { - lderr(store->ctx()) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type - << " for next part, compression process failed" << dendl; - return -EIO; - } - // if compressor isn't available - just do not use it with log warning? - ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type - << "for rgw, check rgw_compression_type config option" << dendl; - compressed = false; - in_bl.claim(bl); - } else { - int cr = compressor->compress(bl, in_bl); - if (cr < 0) { - if (ofs > 0 && compressed) { - lderr(store->ctx()) << "Compression failed with exit code " << cr - << " for next part, compression process failed" << dendl; - return -EIO; - } - ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl; - compressed = false; - in_bl.claim(bl); - } else { - compressed = true; - - compression_block newbl; - int bs = blocks.size(); - newbl.old_ofs = ofs; - newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; - newbl.len = in_bl.length(); - blocks.push_back(newbl); - } - } - } else { - compressed = false; - in_bl.claim(bl); - } - // end of compression stuff - if (extra_data_len) { - size_t extra_len = in_bl.length(); + size_t extra_len = bl.length(); if (extra_len > extra_data_len) extra_len = extra_data_len; bufferlist extra; - in_bl.splice(0, extra_len, &extra); + bl.splice(0, extra_len, &extra); extra_data_bl.append(extra); extra_data_len -= extra_len; - if (in_bl.length() == 0) { + if (bl.length() == 0) { return 0; } } uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs); - pending_data_bl.claim_append(in_bl); + pending_data_bl.claim_append(bl); if (pending_data_bl.length() < max_write_size) return 0; - pending_data_bl.splice(0, max_write_size, &in_bl); + pending_data_bl.splice(0, max_write_size, &bl); /* do we have enough data pending accumulated that needs to be written? */ *again = (pending_data_bl.length() >= max_chunk_size); if (!data_ofs && !immutable_head()) { - first_chunk.claim(in_bl); + first_chunk.claim(bl); obj_len = (uint64_t)first_chunk.length(); int r = prepare_next_part(obj_len); if (r < 0) { @@ -2450,13 +2405,12 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, return 0; } off_t write_ofs = data_ofs; - data_ofs = write_ofs + in_bl.length(); + data_ofs = write_ofs + bl.length(); bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there we could be racing with another upload, to the same object and cleanup can be messy */ - int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive); + int ret = write_data(bl, write_ofs, phandle, pobj, exclusive); if (ret >= 0) { /* we might return, need to clear bl as it was already sent */ - in_bl.clear(); bl.clear(); } return ret; @@ -5262,9 +5216,10 @@ int RGWRados::Bucket::List::list_objects(int max, vector *result, bufferlist attr; map attrset; int y = store->raw_obj_stat(cs_obj, NULL, NULL, NULL, &attrset, NULL, NULL); - if (!y && attrset.find(RGW_ATTR_COMPRESSION) != attrset.end()) { + map::iterator cmp = attrset.find(RGW_ATTR_COMPRESSION); + if (!y && cmp != attrset.end()) { RGWCompressionInfo cs_info; - bufferlist::iterator bliter = attrset[RGW_ATTR_COMPRESSION].begin(); + bufferlist::iterator bliter = cmp->second.begin(); try { ::decode(cs_info, bliter); } catch (buffer::error& err) { @@ -7401,8 +7356,9 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx, attrs.erase(RGW_ATTR_ID_TAG); attrs.erase(RGW_ATTR_PG_VER); attrs.erase(RGW_ATTR_SOURCE_ZONE); - if (src_attrs.find(RGW_ATTR_COMPRESSION) != src_attrs.end()) - attrs[RGW_ATTR_COMPRESSION] = src_attrs[RGW_ATTR_COMPRESSION]; + map::iterator cmp = src_attrs.find(RGW_ATTR_COMPRESSION); + if (cmp != src_attrs.end()) + attrs[RGW_ATTR_COMPRESSION] = cmp->second; RGWObjManifest manifest; RGWObjState *astate = NULL; @@ -12915,8 +12871,9 @@ int RGWRados::delete_obj_aio(rgw_obj& obj, rgw_bucket& bucket, } int rgw_compression_info_from_attrset(map& attrs, bool& need_decompress, RGWCompressionInfo& cs_info) { - if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) { - bufferlist::iterator bliter = attrs[RGW_ATTR_COMPRESSION].begin(); + map::iterator value = attrs.find(RGW_ATTR_COMPRESSION); + if (value != attrs.end()) { + bufferlist::iterator bliter = value->second.begin(); try { ::decode(cs_info, bliter); } catch (buffer::error& err) { @@ -12933,4 +12890,17 @@ int rgw_compression_info_from_attrset(map& attrs, bool& need } } - +int rgw_bucket_compression_info_from_attrset(map& attrs, RGWBucketCompressionInfo& cs_info) +{ + map::iterator cmp = attrs.find(RGW_ATTR_COMPRESSION); + if (cmp != attrs.end()) { + try { + ::decode(cs_info, cmp->second); + } catch (buffer::error& err) { + return -EIO; + } + } else { + return 1; + } + return 0; +} diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 7406ded0d3eac..e79f4a199e727 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -105,6 +105,9 @@ struct RGWCompressionInfo { vector blocks; RGWCompressionInfo() : compression_type("none"), orig_size(0) {} + RGWCompressionInfo(const RGWCompressionInfo& cs_info) : compression_type(cs_info.compression_type), + orig_size(cs_info.orig_size), + blocks(cs_info.blocks) {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -124,7 +127,28 @@ struct RGWCompressionInfo { }; WRITE_CLASS_ENCODER(RGWCompressionInfo) +struct RGWBucketCompressionInfo { + uint64_t orig_size; + + RGWBucketCompressionInfo() : orig_size(0) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(orig_size, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(orig_size, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(RGWBucketCompressionInfo) + int rgw_compression_info_from_attrset(map& attrs, bool& need_decompress, RGWCompressionInfo& cs_info); +// return 0, if find and read; -EIO, if find and not read; 1 if not find +int rgw_bucket_compression_info_from_attrset(map& attrs, RGWBucketCompressionInfo& cs_info); struct RGWOLHInfo { rgw_obj target; @@ -3319,10 +3343,8 @@ protected: RGWRados *store; RGWObjectCtx& obj_ctx; bool is_complete; - bool compressed; RGWBucketInfo bucket_info; bool canceled; - vector blocks; virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime, map& attrs, ceph::real_time delete_at, @@ -3332,28 +3354,21 @@ public: RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), - compressed(false), bucket_info(_bi), - canceled(false), - compression_enabled(false) {} + canceled(false) {} virtual ~RGWPutObjProcessor() {} virtual int prepare(RGWRados *_store, string *oid_rand) { store = _store; return 0; } - //virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again); - //virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0; virtual int complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime, map& attrs, ceph::real_time delete_at, const char *if_match = NULL, const char *if_nomatch = NULL); - bool compression_enabled; CephContext *ctx(); bool is_canceled() { return canceled; } - bool is_compressed() { return compressed; } - const vector& get_compression_blocks() { return blocks; } }; /* RGWPutObjProcessor */ struct put_obj_aio_info { diff --git a/src/rgw/rgw_rest_swift.cc b/src/rgw/rgw_rest_swift.cc index b482a2ace5008..977a73b67f50c 100644 --- a/src/rgw/rgw_rest_swift.cc +++ b/src/rgw/rgw_rest_swift.cc @@ -1240,7 +1240,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, const off_t bl_ofs, const off_t bl_len) { - string content_type; int rr; + string content_type; if (sent_header) { goto send_data; diff --git a/src/spdk b/src/spdk index c02b179490123..9322c258084c6 160000 --- a/src/spdk +++ b/src/spdk @@ -1 +1 @@ -Subproject commit c02b179490123a3212b0c0d23b69da13965d1552 +Subproject commit 9322c258084c6abdeefe00067f8b310a6e0d9a5a diff --git a/src/vstart.sh b/src/vstart.sh index a829809cf75fe..cf322b9eee5ca 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -16,12 +16,6 @@ if [ -n "$VSTART_DEST" ]; then CEPH_OUT_DIR=$VSTART_DEST/out fi -mkdir -p .libs/compressor -for f in `ls -d compressor/*/`; -do - cp .libs/libceph_`basename $f`.so* .libs/compressor/; -done - # for running out of the CMake build directory if [ -e CMakeCache.txt ]; then # Out of tree build, learn source location from CMakeCache.txt diff --git a/src/xxHash b/src/xxHash index 44a6297b298e5..1f40c6511fa8d 160000 --- a/src/xxHash +++ b/src/xxHash @@ -1 +1 @@ -Subproject commit 44a6297b298e59ab7452defe859f21ed8371aa1c +Subproject commit 1f40c6511fa8dd9d2e337ca8c9bc18b3e87663c9 -- 2.39.5