From: Matt Benjamin Date: Thu, 4 May 2017 19:47:39 +0000 (-0400) Subject: rgw_file: enable compression filter (stores) X-Git-Tag: v12.1.1~83^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6c3e22bfc2a653131779cc2e24fe9573e915ac3c;p=ceph-ci.git rgw_file: enable compression filter (stores) Previously NFS clients could access compressed data, but the RGWWriteDataRequest lacked glue to attach an inbound compression filter. Fixes: http://tracker.ceph.com/issues/20462 Signed-off-by: Matt Benjamin --- diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index ca8a5c4b4b8..89d53f7dfb9 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -1286,6 +1286,27 @@ namespace rgw { processor = select_processor(*static_cast(s->obj_ctx), &multipart); op_ret = processor->prepare(get_store(), NULL); + if (op_ret < 0) { + ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret + << dendl; + goto done; + } + + filter = processor; + + if (compression_type == "none") + compression_type = get_store()->get_zone_params().get_compression_type( + s->bucket_info.placement_rule); + if (compression_type != "none") { + plugin = Compressor::create(s->cct, compression_type); + if (! plugin) { + ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type " + << compression_type << dendl; + } else { + compressor.emplace(s->cct, plugin, filter); + filter = &*compressor; + } + } done: return op_ret; @@ -1313,8 +1334,7 @@ namespace rgw { orig_data = data; } hash.Update((const byte *)data.c_str(), data.length()); - op_ret = put_data_and_throttle(processor, data, ofs, - need_to_wait); + op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait); if (op_ret < 0) { if (!need_to_wait || op_ret != -EEXIST) { ldout(s->cct, 20) << "processor->thottle_data() returned ret=" @@ -1331,6 +1351,7 @@ namespace rgw { dispose_processor(processor); processor = select_processor(*static_cast(s->obj_ctx), &multipart); + filter = processor; string oid_rand; char buf[33]; @@ -1344,6 +1365,12 @@ namespace rgw { goto done; } + /* restore compression filter, if any */ + if (compressor) { + compressor.emplace(s->cct, plugin, filter); + filter = &*compressor; + } + op_ret = put_data_and_throttle(processor, data, ofs, false); if (op_ret < 0) { goto done; @@ -1384,6 +1411,20 @@ namespace rgw { hash.Final(m); + if (compressor && compressor->is_compressed()) { + bufferlist tmp; + RGWCompressionInfo cs_info; + cs_info.compression_type = plugin->get_type_name(); + cs_info.orig_size = s->obj_size; + cs_info.blocks = std::move(compressor->get_compression_blocks()); + ::encode(cs_info, tmp); + attrs[RGW_ATTR_COMPRESSION] = tmp; + ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION + << " with type=" << cs_info.compression_type + << ", orig_size=" << cs_info.orig_size + << ", blocks=" << cs_info.blocks.size() << dendl; + } + buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); etag = calc_md5; diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index b93b5928409..9a07d8340bb 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -34,6 +34,7 @@ #include "rgw_lib.h" #include "rgw_ldap.h" #include "rgw_token.h" +#include "rgw_compression.h" /* XXX @@ -2186,7 +2187,11 @@ public: const std::string& bucket_name; const std::string& obj_name; RGWFileHandle* rgw_fh; - RGWPutObjProcessor *processor; + RGWPutObjProcessor* processor; + RGWPutObjDataProcessor* filter; + boost::optional compressor; + std::string compression_type = "none"; + CompressorRef plugin; buffer::list data; uint64_t timer_id; MD5 hash; @@ -2198,8 +2203,8 @@ public: RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh, const std::string& _bname, const std::string& _oname) : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname), - rgw_fh(_fh), processor(nullptr), real_ofs(0), bytes_written(0), - multipart(false), eio(false) { + rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0), + bytes_written(0), multipart(false), eio(false) { int ret = header_init(); if (ret == 0) {