]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWBulkUploadOp uses AtomicObjectProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:54:32 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_op.cc

index ec784713f278fb8bb8d1668402fef513fe54e588..dacc324e87273c5d719eefca2b293fb36e8e94b1 100644 (file)
@@ -6688,9 +6688,6 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
 
   ldpp_dout(this, 20) << "got file=" << path << ", size=" << size << dendl;
 
-  RGWPutObjDataProcessor *filter = nullptr;
-  boost::optional<RGWPutObj_Compress> compressor;
-
   if (size > static_cast<size_t>(s->cct->_conf->rgw_max_put_size)) {
     op_ret = -ERR_TOO_LARGE;
     return op_ret;
@@ -6731,28 +6728,30 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
-  RGWPutObjProcessor_Atomic processor(obj_ctx,
-                                      binfo,
-                                      binfo.bucket,
-                                      object.name,
-                                      /* part size */
-                                      s->cct->_conf->rgw_obj_stripe_size,
-                                      s->req_id,
-                                      binfo.versioning_enabled());
+  rgw_obj obj(binfo.bucket, object);
+  if (s->bucket_info.versioning_enabled()) {
+    store->gen_rand_obj_instance_name(&obj);
+  }
 
-  /* No filters by default. */
-  filter = &processor;
+  using namespace rgw::putobj;
+  AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+
+  AtomicObjectProcessor processor(&aio, store, binfo, bowner.get_id(),
+                                  obj_ctx, obj, 0, s->req_id);
 
-  op_ret = processor.prepare(store, nullptr);
+  op_ret = processor.prepare();
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "cannot prepare processor due to ret=" << op_ret << dendl;
     return op_ret;
   }
 
+  /* No filters by default. */
+  DataProcessor *filter = &processor;
+
   const auto& compression_type = store->get_zone_params().get_compression_type(
       binfo.placement_rule);
   CompressorRef plugin;
-#if 0
+  boost::optional<RGWPutObj_Compress> compressor;
   if (compression_type != "none") {
     plugin = Compressor::create(s->cct, compression_type);
     if (! plugin) {
@@ -6763,7 +6762,7 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
       filter = &*compressor;
     }
   }
-#endif
+
   /* Upload file content. */
   ssize_t len = 0;
   size_t ofs = 0;
@@ -6778,9 +6777,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
       return op_ret;
     } else if (len > 0) {
       hash.Update((const unsigned char *)data.c_str(), data.length());
-      op_ret = put_data_and_throttle(filter, data, ofs, false);
+      op_ret = filter->process(std::move(data), ofs);
       if (op_ret < 0) {
-        ldpp_dout(this, 20) << "processor->thottle_data() returned ret=" << op_ret << dendl;
+        ldpp_dout(this, 20) << "filter->process() returned ret=" << op_ret << dendl;
         return op_ret;
       }
 
@@ -6789,9 +6788,16 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
 
   } while (len > 0);
 
+  // flush
+  op_ret = filter->process({}, ofs);
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
   if (ofs != size) {
     ldpp_dout(this, 10) << "real file size different from declared" << dendl;
     op_ret = -EINVAL;
+    return op_ret;
   }
 
   op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
@@ -6837,8 +6843,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
   }
 
   /* Complete the transaction. */
-  op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), attrs,
-                              ceph::real_time() /* delete_at */);
+  op_ret = processor.complete(size, etag, nullptr, ceph::real_time(),
+                              attrs, ceph::real_time() /* delete_at */,
+                              nullptr, nullptr, nullptr, nullptr, nullptr);
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "processor::complete returned op_ret=" << op_ret << dendl;
   }