]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: prepare RGWRadosPutObj for fetch_remote_obj
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:54:26 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_rados.cc

index 6ac2daf0058cfa582e485076a99ef4632e696f48..1e498e4623f147e5d24f269375b7132a77e210bb 100644 (file)
@@ -7538,59 +7538,28 @@ bool RGWRados::aio_completed(void *handle)
   return c->is_safe();
 }
 
-// PutObj filter that buffers data so we don't try to compress tiny blocks.
-// libcurl reads in 16k at a time, and we need at least 64k to get a good
-// compression ratio
-class RGWPutObj_Buffer : public RGWPutObj_Filter {
-  const unsigned buffer_size;
-  bufferlist buffer;
- public:
-  RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
-    : RGWPutObj_Filter(next), buffer_size(buffer_size) {
-    ceph_assert(isp2(buffer_size)); // must be power of 2
-  }
-
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
-                  bool *again) override {
-    if (*again || !bl.length()) {
-      // flush buffered data
-      return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
-    }
-    // transform offset to the beginning of the buffer
-    ofs = ofs - buffer.length();
-    buffer.claim_append(bl);
-    if (buffer.length() < buffer_size) {
-      *again = false; // don't come back until there's more data
-      return 0;
-    }
-    const auto count = p2align(buffer.length(), buffer_size);
-    buffer.splice(0, count, &bl);
-    return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
-  }
-};
-
 class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
 {
   CephContext* cct;
   rgw_obj obj;
-  RGWPutObjDataProcessor *filter;
+  rgw::putobj::DataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
-  boost::optional<RGWPutObj_Buffer> buffering;
+  boost::optional<rgw::putobj::ChunkProcessor> buffering;
   CompressorRef& plugin;
-  RGWPutObjProcessor_Atomic *processor;
+  rgw::putobj::ObjectProcessor *processor;
   void (*progress_cb)(off_t, void *);
   void *progress_data;
   bufferlist extra_data_bl;
   uint64_t extra_data_left;
   uint64_t data_len;
   map<string, bufferlist> src_attrs;
-  off_t ofs{0};
-  off_t lofs{0}; /* logical ofs */
+  uint64_t ofs{0};
+  uint64_t lofs{0}; /* logical ofs */
 public:
   RGWRadosPutObj(CephContext* cct,
                  CompressorRef& plugin,
                  boost::optional<RGWPutObj_Compress>& compressor,
-                 RGWPutObjProcessor_Atomic *p,
+                 rgw::putobj::ObjectProcessor *p,
                  void (*_progress_cb)(off_t, void *),
                  void *_progress_data) :
                        cct(cct),
@@ -7619,22 +7588,23 @@ public:
 
     if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
       //do not compress if object is encrypted
-#if 0
       compressor = boost::in_place(cct, plugin, filter);
+      // add a filter that buffers data so we don't try to compress tiny blocks.
+      // libcurl reads in 16k at a time, and we need at least 64k to get a good
+      // compression ratio
       constexpr unsigned buffer_size = 512 * 1024;
       buffering = boost::in_place(&*compressor, buffer_size);
       filter = &*buffering;
-#endif
     }
     return 0;
   }
 
   int handle_data(bufferlist& bl, bool *pause) override {
     if (progress_cb) {
-      progress_cb(lofs, progress_data);
+      progress_cb(data_len, progress_data);
     }
     if (extra_data_left) {
-      size_t extra_len = bl.length();
+      uint64_t extra_len = bl.length();
       if (extra_len > extra_data_left)
         extra_len = extra_data_left;
 
@@ -7656,32 +7626,17 @@ public:
 
     ceph_assert(uint64_t(ofs) >= extra_data_len);
 
-    lofs = ofs - extra_data_len;
-
-    data_len += bl.length();
-    bool again = false;
-
-    do {
-      void *handle = NULL;
-      rgw_raw_obj obj;
-      uint64_t size = bl.length();
-      int ret = filter->handle_data(bl, lofs, &handle, &obj, &again);
-      if (ret < 0)
-        return ret;
-
-      ofs += size;
+    uint64_t size = bl.length();
+    ofs += size;
 
-      ret = filter->throttle_data(handle, obj, size, false);
-      if (ret < 0)
-        return ret;
-    } while (again);
+    const uint64_t lofs = data_len;
+    data_len += size;
 
-    return 0;
+    return filter->process(std::move(bl), lofs);
   }
 
   int flush() {
-    bufferlist bl;
-    return put_data_and_throttle(filter, bl, ofs, false);
+    return filter->process({}, data_len);
   }
 
   bufferlist& get_extra_data() { return extra_data_bl; }
@@ -7698,12 +7653,9 @@ public:
   }
 
   int complete(const string& etag, real_time *mtime, real_time set_mtime,
-               map<string, bufferlist>& attrs, real_time delete_at, rgw_zone_set *zones_trace) {
-    return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace);
-  }
-
-  bool is_canceled() {
-    return processor->is_canceled();
+               map<string, bufferlist>& attrs, real_time delete_at,
+               rgw_zone_set *zones_trace, bool *canceled) {
+    return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace, canceled);
   }
 };
 
@@ -8164,11 +8116,12 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 
 #define MAX_COMPLETE_RETRY 100
   for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
-    ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace);
+    bool canceled = false;
+    ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace, &canceled);
     if (ret < 0) {
       goto set_err_state;
     }
-    if (copy_if_newer && cb.is_canceled()) {
+    if (copy_if_newer && canceled) {
       ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl;
       obj_ctx.obj.invalidate(dest_obj); /* object was overwritten */
       ret = get_obj_state(&obj_ctx, dest_bucket_info, dest_obj, &dest_state, false);