From: Casey Bodley Date: Fri, 11 Nov 2016 14:53:34 +0000 (-0500) Subject: rgw: move compressor plugin out of filter X-Git-Tag: v11.1.0~220^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=59df6f15a5a227aca5058b9f2ab3003df0c32761;p=ceph.git rgw: move compressor plugin out of filter if rgw_compression_type is set to "random", we need to use the same plugin for each block of data Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_compression.cc b/src/rgw/rgw_compression.cc index ee4d4603690..3a193ce7f95 100644 --- a/src/rgw/rgw_compression.cc +++ b/src/rgw/rgw_compression.cc @@ -19,39 +19,26 @@ int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, r if ((ofs > 0 && compressed) || // if previous part was compressed (ofs == 0)) { // or it's the first part ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl; - CompressorRef compressor = Compressor::create(cct, cct->_conf->rgw_compression_type); - if (!compressor.get()) { - if (ofs > 0 && compressed) { - lderr(cct) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type - << " for next part, compression process failed" << dendl; + int cr = compressor->compress(bl, in_bl); + if (cr < 0) { + if (ofs > 0) { + lderr(cct) << "Compression failed with exit code " << cr + << " for next part, compression process failed" << dendl; return -EIO; } - // if compressor isn't available - just do not use it with log warning? - ldout(cct, 5) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type - << " for rgw, check rgw_compression_type config option" << dendl; compressed = false; + ldout(cct, 5) << "Compression failed with exit code " << cr + << " for first part, storing uncompressed" << dendl; in_bl.claim(bl); } else { - int cr = compressor->compress(bl, in_bl); - if (cr < 0) { - if (ofs > 0 && compressed) { - lderr(cct) << "Compression failed with exit code " << cr - << " for next part, compression process failed" << dendl; - return -EIO; - } - ldout(cct, 5) << "Compression failed with exit code " << cr << dendl; - compressed = false; - in_bl.claim(bl); - } else { - compressed = true; + compressed = true; - compression_block newbl; - int bs = blocks.size(); - newbl.old_ofs = ofs; - newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; - newbl.len = in_bl.length(); - blocks.push_back(newbl); - } + compression_block newbl; + int bs = blocks.size(); + newbl.old_ofs = ofs; + newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; + newbl.len = in_bl.length(); + blocks.push_back(newbl); } } else { compressed = false; diff --git a/src/rgw/rgw_compression.h b/src/rgw/rgw_compression.h index 012ba320286..a649afdfe11 100644 --- a/src/rgw/rgw_compression.h +++ b/src/rgw/rgw_compression.h @@ -35,12 +35,13 @@ public: class RGWPutObj_Compress : public RGWPutObj_Filter { CephContext* cct; - bool compressed; + bool compressed{false}; + CompressorRef compressor; std::vector blocks; public: - RGWPutObj_Compress(CephContext* cct_, RGWPutObjDataProcessor* next) : RGWPutObj_Filter(next), - cct(cct_), - compressed(false) {} + RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor, + RGWPutObjDataProcessor* next) + : RGWPutObj_Filter(next), cct(cct_), compressor(compressor) {} virtual ~RGWPutObj_Compress(){} virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 5d7fc74524e..4f35e9cddce 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -2824,7 +2824,8 @@ void RGWPutObj::execute() off_t fst; off_t lst; - bool compression_enabled; + const auto& compression_type = s->cct->_conf->rgw_compression_type; + CompressorRef plugin; boost::optional compressor; bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL); @@ -2910,10 +2911,15 @@ void RGWPutObj::execute() fst = copy_source_range_fst; lst = copy_source_range_lst; - compression_enabled = s->cct->_conf->rgw_compression_type != "none"; - if (compression_enabled) { - compressor = boost::in_place(s->cct, filter); - filter = &*compressor; + if (compression_type != "none") { + plugin = Compressor::create(s->cct, compression_type); + if (!plugin) { + ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type " + << compression_type << dendl; + } else { + compressor = boost::in_place(s->cct, plugin, filter); + filter = &*compressor; + } } do { @@ -2993,8 +2999,8 @@ void RGWPutObj::execute() goto done; } - if (compression_enabled) { - compressor = boost::in_place(s->cct, filter); + if (compressor) { + compressor = boost::in_place(s->cct, plugin, filter); filter = &*compressor; } @@ -3050,10 +3056,10 @@ void RGWPutObj::execute() hash.Final(m); - if (compression_enabled && compressor->is_compressed()) { + if (compressor && compressor->is_compressed()) { bufferlist tmp; RGWCompressionInfo cs_info; - cs_info.compression_type = s->cct->_conf->rgw_compression_type; + cs_info.compression_type = plugin->get_type_name(); cs_info.orig_size = s->obj_size; cs_info.blocks = move(compressor->get_compression_blocks()); ::encode(cs_info, tmp); @@ -3168,7 +3174,6 @@ void RGWPostObj::execute() MD5 hash; buffer::list bl, aclbl; int len = 0; - bool compression_enabled; boost::optional compressor; // read in the data from the POST form @@ -3209,10 +3214,17 @@ void RGWPostObj::execute() if (op_ret < 0) return; - compression_enabled = s->cct->_conf->rgw_compression_type != "none"; - if (compression_enabled) { - compressor = boost::in_place(s->cct, filter); - filter = &*compressor; + const auto& compression_type = s->cct->_conf->rgw_compression_type; + CompressorRef plugin; + if (compression_type != "none") { + plugin = Compressor::create(s->cct, compression_type); + if (!plugin) { + ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type " + << compression_type << dendl; + } else { + compressor = boost::in_place(s->cct, plugin, filter); + filter = &*compressor; + } } while (data_pending) { @@ -3267,10 +3279,10 @@ void RGWPostObj::execute() emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl)); } - if (compression_enabled && compressor->is_compressed()) { + if (compressor && compressor->is_compressed()) { bufferlist tmp; RGWCompressionInfo cs_info; - cs_info.compression_type = s->cct->_conf->rgw_compression_type; + cs_info.compression_type = plugin->get_type_name(); cs_info.orig_size = s->obj_size; cs_info.blocks = move(compressor->get_compression_blocks()); ::encode(cs_info, tmp); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 352c87e7b7f..fd80c2afe80 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -7070,12 +7070,20 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, } boost::optional compressor; + CompressorRef plugin; RGWPutObjDataProcessor *filter = &processor; - bool compression_enabled = cct->_conf->rgw_compression_type != "none"; - if (compression_enabled) { - compressor = boost::in_place(cct, filter); - filter = &*compressor; + + const auto& compression_type = cct->_conf->rgw_compression_type; + if (compression_type != "none") { + plugin = Compressor::create(cct, compression_type); + if (!plugin) { + ldout(cct, 1) << "Cannot load plugin for rgw_compression_type " + << compression_type << dendl; + } else { + compressor = boost::in_place(cct, plugin, filter); + filter = &*compressor; + } } RGWRadosPutObj cb(cct, filter, &processor, opstate, progress_cb, progress_data); @@ -7142,10 +7150,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, } } } - if (compression_enabled && compressor->is_compressed()) { + if (compressor && compressor->is_compressed()) { bufferlist tmp; RGWCompressionInfo cs_info; - cs_info.compression_type = cct->_conf->rgw_compression_type; + cs_info.compression_type = plugin->get_type_name(); cs_info.orig_size = cb.get_data_len(); cs_info.blocks = move(compressor->get_compression_blocks()); ::encode(cs_info, tmp);