]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: PostObj uses AtomicObjectProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:54:30 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_op.cc

index 28c816e9d491bd03f66fbb54d483b9040f4c86c8..ec784713f278fb8bb8d1668402fef513fe54e588 100644 (file)
@@ -3883,7 +3883,6 @@ void RGWPostObj::pre_exec()
 
 void RGWPostObj::execute()
 {
-  RGWPutObjDataProcessor *filter = nullptr;
   boost::optional<RGWPutObj_Compress> compressor;
   CompressorRef plugin;
   char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
@@ -3926,7 +3925,6 @@ void RGWPostObj::execute()
   /* Start iteration over data fields. It's necessary as Swift's FormPost
    * is capable to handle multiple files in single form. */
   do {
-    std::unique_ptr<RGWPutObjDataProcessor> encrypt;
     char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
     unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
     MD5 hash;
@@ -3962,23 +3960,26 @@ void RGWPostObj::execute()
       ldpp_dout(this, 15) << "supplied_md5=" << supplied_md5 << dendl;
     }
 
-    RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
-                                        s->bucket_info,
-                                        s->bucket,
-                                        get_current_filename(),
-                                        /* part size */
-                                        s->cct->_conf->rgw_obj_stripe_size,
-                                        s->req_id,
-                                        s->bucket_info.versioning_enabled());
-    processor.set_olh_epoch(0);
-    /* No filters by default. */
-    filter = &processor;
+    rgw_obj obj(s->bucket, get_current_filename());
+    if (s->bucket_info.versioning_enabled()) {
+      store->gen_rand_obj_instance_name(&obj);
+    }
 
-    op_ret = processor.prepare(store, nullptr);
+    using namespace rgw::putobj;
+    AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
+    AtomicObjectProcessor processor(&aio, store, s->bucket_info,
+                                    s->bucket_owner.get_id(),
+                                    *static_cast<RGWObjectCtx*>(s->obj_ctx),
+                                    obj, 0, s->req_id);
+    op_ret = processor.prepare();
     if (op_ret < 0) {
       return;
     }
-#if 0
+
+    /* No filters by default. */
+    DataProcessor *filter = &processor;
+
+    std::unique_ptr<DataProcessor> encrypt;
     op_ret = get_encrypt_filter(&encrypt, filter);
     if (op_ret < 0) {
       return;
@@ -3999,7 +4000,7 @@ void RGWPostObj::execute()
         }
       }
     }
-#endif
+
     bool again;
     do {
       ceph::bufferlist data;
@@ -4015,7 +4016,7 @@ void RGWPostObj::execute()
       }
 
       hash.Update((const unsigned char *)data.c_str(), data.length());
-      op_ret = put_data_and_throttle(filter, data, ofs, false);
+      op_ret = filter->process(std::move(data), ofs);
 
       ofs += len;
 
@@ -4025,9 +4026,10 @@ void RGWPostObj::execute()
       }
     } while (again);
 
-    {
-      bufferlist flush;
-      op_ret = put_data_and_throttle(filter, flush, ofs, false);
+    // flush
+    op_ret = filter->process({}, ofs);
+    if (op_ret < 0) {
+      return;
     }
 
     if (len < min_len) {
@@ -4082,8 +4084,12 @@ void RGWPostObj::execute()
       emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
     }
 
-    op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(),
-                                attrs, (delete_at ? *delete_at : real_time()));
+    op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), attrs,
+                                (delete_at ? *delete_at : real_time()),
+                                nullptr, nullptr, nullptr, nullptr, nullptr);
+    if (op_ret < 0) {
+      return;
+    }
   } while (is_next_file_to_upload());
 }