From: Casey Bodley Date: Fri, 13 Apr 2018 20:01:48 +0000 (-0400) Subject: rgw: add buffering filter to compression for fetch_remote_obj X-Git-Tag: v12.2.6~129^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5d755dacc025ea24b98e87b5e94e68708783c63c;p=ceph.git rgw: add buffering filter to compression for fetch_remote_obj fetch_remote_obj() only gets 16k blocks from libcurl, which leads to a much worse compression ratio than the 4m blocks in normal PUT requests Fixes: http://tracker.ceph.com/issues/23547 Signed-off-by: Casey Bodley (cherry picked from commit 13160222fd6894a2851e61fd909d7332747ecdca) Conflicts: src/rgw/rgw_rados.cc (template functions for intarith macros were added in c06b97b3d7e36b5b3be38f1ffa121611bea61a52, and RGWGetDataCB was changed in b2143cded0e971361cdb089db19a6f69ce5b74dd) --- diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index fd30c347b996..45c6f426296c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -7314,12 +7314,44 @@ bool RGWRados::aio_completed(void *handle) return c->is_safe(); } +// PutObj filter that buffers data so we don't try to compress tiny blocks. +// libcurl reads in 16k at a time, and we need at least 64k to get a good +// compression ratio +class RGWPutObj_Buffer : public RGWPutObj_Filter { + const unsigned buffer_size; + bufferlist buffer; + public: + RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size) + : RGWPutObj_Filter(next), buffer_size(buffer_size) { + assert(ISP2(buffer_size)); // must be power of 2 + } + + int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, + bool *again) override { + if (*again || !bl.length()) { + // flush buffered data + return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again); + } + // transform offset to the beginning of the buffer + ofs = ofs - buffer.length(); + buffer.claim_append(bl); + if (buffer.length() < buffer_size) { + *again = false; // don't come back until there's more data + return 0; + } + const auto count = P2ALIGN(buffer.length(), buffer_size); + buffer.splice(0, count, &bl); + return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again); + } +}; + class RGWRadosPutObj : public RGWGetDataCB { CephContext* cct; rgw_obj obj; RGWPutObjDataProcessor *filter; boost::optional& compressor; + boost::optional buffering; CompressorRef& plugin; RGWPutObjProcessor_Atomic *processor; RGWOpStateSingleOp *opstate; @@ -7365,7 +7397,9 @@ public: if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) { //do not compress if object is encrypted compressor = boost::in_place(cct, plugin, filter); - filter = &*compressor; + constexpr unsigned buffer_size = 512 * 1024; + buffering = boost::in_place(&*compressor, buffer_size); + filter = &*buffering; } return 0; } @@ -7437,6 +7471,11 @@ public: return 0; } + int flush() { + bufferlist bl; + return put_data_and_throttle(filter, bl, 0, false); + } + bufferlist& get_extra_data() { return extra_data_bl; } map& get_attrs() { return src_attrs; } @@ -7866,6 +7905,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, if (ret < 0) { goto set_err_state; } + ret = cb.flush(); + if (ret < 0) { + goto set_err_state; + } if (compressor && compressor->is_compressed()) { bufferlist tmp; RGWCompressionInfo cs_info;