]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add buffering filter to compression for fetch_remote_obj 21479/head
authorCasey Bodley <cbodley@redhat.com>
Fri, 13 Apr 2018 20:01:48 +0000 (16:01 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 23 Apr 2018 15:35:00 +0000 (11:35 -0400)
fetch_remote_obj() only gets 16k blocks from libcurl, which leads to a
much worse compression ratio than the 4m blocks in normal PUT requests

Fixes: http://tracker.ceph.com/issues/23547
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_rados.cc

index 0fb6bdab765c18efc199f0cc49004c77710cb6ea..3b7034bc5b6f22f5927800f00aeeb68ff5d3fbf6 100644 (file)
@@ -7489,12 +7489,44 @@ bool RGWRados::aio_completed(void *handle)
   return c->is_safe();
 }
 
+// PutObj filter that buffers data so we don't try to compress tiny blocks.
+// libcurl reads in 16k at a time, and we need at least 64k to get a good
+// compression ratio
+class RGWPutObj_Buffer : public RGWPutObj_Filter {
+  const unsigned buffer_size;
+  bufferlist buffer;
+ public:
+  RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
+    : RGWPutObj_Filter(next), buffer_size(buffer_size) {
+    assert(isp2(buffer_size)); // must be power of 2
+  }
+
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
+                  bool *again) override {
+    if (*again || !bl.length()) {
+      // flush buffered data
+      return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
+    }
+    // transform offset to the beginning of the buffer
+    ofs = ofs - buffer.length();
+    buffer.claim_append(bl);
+    if (buffer.length() < buffer_size) {
+      *again = false; // don't come back until there's more data
+      return 0;
+    }
+    const auto count = p2align(buffer.length(), buffer_size);
+    buffer.splice(0, count, &bl);
+    return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
+  }
+};
+
 class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
 {
   CephContext* cct;
   rgw_obj obj;
   RGWPutObjDataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
+  boost::optional<RGWPutObj_Buffer> buffering;
   CompressorRef& plugin;
   RGWPutObjProcessor_Atomic *processor;
   RGWOpStateSingleOp *opstate;
@@ -7542,7 +7574,9 @@ public:
     if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
       //do not compress if object is encrypted
       compressor = boost::in_place(cct, plugin, filter);
-      filter = &*compressor;
+      constexpr unsigned buffer_size = 512 * 1024;
+      buffering = boost::in_place(&*compressor, buffer_size);
+      filter = &*buffering;
     }
     return 0;
   }
@@ -7616,6 +7650,11 @@ public:
     return 0;
   }
 
+  int flush() {
+    bufferlist bl;
+    return put_data_and_throttle(filter, bl, ofs, false);
+  }
+
   bufferlist& get_extra_data() { return extra_data_bl; }
 
   map<string, bufferlist>& get_attrs() { return src_attrs; }
@@ -8051,6 +8090,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     goto set_err_state;
   }
+  ret = cb.flush();
+  if (ret < 0) {
+    goto set_err_state;
+  }
   if (compressor && compressor->is_compressed()) {
     bufferlist tmp;
     RGWCompressionInfo cs_info;