]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: rgw_file uses AtomicObjectProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:54:22 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index ec20ff962ecc0729853f881c3750f186b42bd28f..702f9504f6b8049cc866e5d46287b9cab5a20dac 100644 (file)
@@ -1295,6 +1295,7 @@ namespace rgw {
 
     perfcounter->inc(l_rgw_put);
     op_ret = -EINVAL;
+    rgw_obj obj{s->bucket, s->object};
 
     if (s->object.empty()) {
       ldout(s->cct, 0) << __func__ << " called on empty object" << dendl;
@@ -1314,17 +1315,29 @@ namespace rgw {
     /* early quota check skipped--we don't have size yet */
     /* skipping user-supplied etag--we might have one in future, but
      * like data it and other attrs would arrive after open */
-    processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
-                                &multipart);
-    op_ret = processor->prepare(get_store(), NULL);
+
+    aio.emplace(s->cct->_conf->rgw_put_obj_min_window_size);
+
+    if (s->bucket_info.versioning_enabled()) {
+      if (!version_id.empty()) {
+        obj.key.set_instance(version_id);
+      } else {
+        get_store()->gen_rand_obj_instance_name(&obj);
+        version_id = obj.key.instance;
+      }
+    }
+    processor.emplace(&*aio, get_store(), s->bucket_info,
+                      s->bucket_owner.get_id(),
+                      *static_cast<RGWObjectCtx *>(s->obj_ctx),
+                      obj, olh_epoch, s->req_id);
+
+    op_ret = processor->prepare();
     if (op_ret < 0) {
       ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
                        << dendl;
       goto done;
     }
-
-    filter = processor;
-#if 0
+    filter = &*processor;
     if (compression_type != "none") {
       plugin = Compressor::create(s->cct, compression_type);
       if (! plugin) {
@@ -1335,7 +1348,6 @@ namespace rgw {
         filter = &*compressor;
       }
     }
-#endif
 
   done:
     return op_ret;
@@ -1359,56 +1371,10 @@ namespace rgw {
     if (! len)
       return 0;
 
-    /* XXX we are currently synchronous--supplied data buffers cannot
-     * be used after the caller returns  */
-    bool need_to_wait = true;
-    bufferlist orig_data;
-
-    if (need_to_wait) {
-      orig_data = data;
-    }
     hash.Update((const unsigned char *)data.c_str(), data.length());
-    op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
+    op_ret = filter->process(std::move(data), ofs);
     if (op_ret < 0) {
-      if (!need_to_wait || op_ret != -EEXIST) {
-       ldout(s->cct, 20) << "processor->thottle_data() returned ret="
-                         << op_ret << dendl;
-       goto done;
-      }
-
-      ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
-
-      /* restore original data */
-      data.swap(orig_data);
-
-      /* restart processing with different oid suffix */
-      dispose_processor(processor);
-      processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
-                                  &multipart);
-      filter = processor;
-
-      string oid_rand;
-      char buf[33];
-      gen_rand_alphanumeric(get_store()->ctx(), buf, sizeof(buf) - 1);
-      oid_rand.append(buf);
-
-      op_ret = processor->prepare(get_store(), &oid_rand);
-      if (op_ret < 0) {
-       ldout(s->cct, 0) << "ERROR: processor->prepare() returned "
-                        << op_ret << dendl;
-       goto done;
-      }
-#if 0
-      /* restore compression filter, if any */
-      if (compressor) {
-       compressor.emplace(s->cct, plugin, filter);
-       filter = &*compressor;
-      }
-#endif
-      op_ret = put_data_and_throttle(filter, data, ofs, false);
-      if (op_ret < 0) {
-       goto done;
-      }
+      goto done;
     }
     bytes_written += len;
 
@@ -1432,6 +1398,12 @@ namespace rgw {
     s->obj_size = bytes_written;
     perfcounter->inc(l_rgw_put_b, s->obj_size);
 
+    // flush data in filters
+    op_ret = filter->process({}, s->obj_size);
+    if (op_ret < 0) {
+      goto done;
+    }
+
     op_ret = get_store()->check_quota(s->bucket_owner.get_id(), s->bucket,
                                      user_quota, bucket_quota, s->obj_size);
     if (op_ret < 0) {
@@ -1503,7 +1475,7 @@ namespace rgw {
 
     op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
                                  (delete_at ? *delete_at : real_time()),
-                                if_match, if_nomatch);
+                                if_match, if_nomatch, nullptr, nullptr, nullptr);
     if (op_ret != 0) {
       /* revert attr updates */
       rgw_fh->set_mtime(omtime);
@@ -1512,7 +1484,6 @@ namespace rgw {
     }
 
   done:
-    dispose_processor(processor);
     perfcounter->tinc(l_rgw_put_lat, s->time_elapsed());
     return op_ret;
   } /* exec_finish */
index 87bd33ac3443a7e93800563a13c464ea7f8d095c..a98d2832e93f435d40b5d9f6cf15c5c2f0a5f7b8 100644 (file)
@@ -34,6 +34,8 @@
 #include "rgw_lib.h"
 #include "rgw_ldap.h"
 #include "rgw_token.h"
+#include "rgw_putobj_processor.h"
+#include "rgw_putobj_throttle.h"
 #include "rgw_compression.h"
 
 
@@ -2289,8 +2291,9 @@ public:
   const std::string& bucket_name;
   const std::string& obj_name;
   RGWFileHandle* rgw_fh;
-  RGWPutObjProcessor* processor;
-  RGWPutObjDataProcessor* filter;
+  std::optional<rgw::putobj::AioThrottle> aio;
+  std::optional<rgw::putobj::AtomicObjectProcessor> processor;
+  rgw::putobj::DataProcessor* filter;
   boost::optional<RGWPutObj_Compress> compressor;
   CompressorRef plugin;
   buffer::list data;
@@ -2298,14 +2301,13 @@ public:
   MD5 hash;
   off_t real_ofs;
   size_t bytes_written;
-  bool multipart;
   bool eio;
 
   RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh,
                  const std::string& _bname, const std::string& _oname)
     : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
-      rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0),
-      bytes_written(0), multipart(false), eio(false) {
+      rgw_fh(_fh), filter(nullptr), real_ofs(0),
+      bytes_written(0), eio(false) {
 
     int ret = header_init();
     if (ret == 0) {
@@ -2348,19 +2350,6 @@ public:
     return 0;
   }
 
-  RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx,
-                                       bool *is_multipart) override {
-    struct req_state* s = get_state();
-    uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
-    RGWPutObjProcessor_Atomic *processor =
-      new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket,
-                                   s->object.name, part_size, s->req_id,
-                                   s->bucket_info.versioning_enabled());
-    processor->set_olh_epoch(olh_epoch);
-    processor->set_version_id(version_id);
-    return processor;
-  }
-
   int get_params() override {
     struct req_state* s = get_state();
     RGWAccessControlPolicy_S3 s3policy(s->cct);