From 13160222fd6894a2851e61fd909d7332747ecdca Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 13 Apr 2018 16:01:48 -0400 Subject: [PATCH] rgw: add buffering filter to compression for fetch_remote_obj fetch_remote_obj() only gets 16k blocks from libcurl, which leads to a much worse compression ratio than the 4m blocks in normal PUT requests Fixes: http://tracker.ceph.com/issues/23547 Signed-off-by: Casey Bodley --- src/rgw/rgw_rados.cc | 45 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0fb6bdab765c1..3b7034bc5b6f2 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -7489,12 +7489,44 @@ bool RGWRados::aio_completed(void *handle) return c->is_safe(); } +// PutObj filter that buffers data so we don't try to compress tiny blocks. +// libcurl reads in 16k at a time, and we need at least 64k to get a good +// compression ratio +class RGWPutObj_Buffer : public RGWPutObj_Filter { + const unsigned buffer_size; + bufferlist buffer; + public: + RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size) + : RGWPutObj_Filter(next), buffer_size(buffer_size) { + assert(isp2(buffer_size)); // must be power of 2 + } + + int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, + bool *again) override { + if (*again || !bl.length()) { + // flush buffered data + return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again); + } + // transform offset to the beginning of the buffer + ofs = ofs - buffer.length(); + buffer.claim_append(bl); + if (buffer.length() < buffer_size) { + *again = false; // don't come back until there's more data + return 0; + } + const auto count = p2align(buffer.length(), buffer_size); + buffer.splice(0, count, &bl); + return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again); + } +}; + class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB { CephContext* cct; rgw_obj obj; RGWPutObjDataProcessor *filter; boost::optional& compressor; + boost::optional buffering; CompressorRef& plugin; RGWPutObjProcessor_Atomic *processor; RGWOpStateSingleOp *opstate; @@ -7542,7 +7574,9 @@ public: if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) { //do not compress if object is encrypted compressor = boost::in_place(cct, plugin, filter); - filter = &*compressor; + constexpr unsigned buffer_size = 512 * 1024; + buffering = boost::in_place(&*compressor, buffer_size); + filter = &*buffering; } return 0; } @@ -7616,6 +7650,11 @@ public: return 0; } + int flush() { + bufferlist bl; + return put_data_and_throttle(filter, bl, ofs, false); + } + bufferlist& get_extra_data() { return extra_data_bl; } map& get_attrs() { return src_attrs; } @@ -8051,6 +8090,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, if (ret < 0) { goto set_err_state; } + ret = cb.flush(); + if (ret < 0) { + goto set_err_state; + } if (compressor && compressor->is_compressed()) { bufferlist tmp; RGWCompressionInfo cs_info; -- 2.47.3