]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: PutObj::execute() uses new ObjectProcessors
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:54:18 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index fc913222f22cd5bcb5b4c4b429821ed5f1a11f8b..28c816e9d491bd03f66fbb54d483b9040f4c86c8 100644 (file)
@@ -21,6 +21,7 @@
 #include "common/mime.h"
 #include "common/utf8.h"
 #include "common/ceph_json.h"
+#include "common/static_ptr.h"
 
 #include "rgw_rados.h"
 #include "rgw_op.h"
@@ -42,6 +43,8 @@
 #include "rgw_compression.h"
 #include "rgw_role.h"
 #include "rgw_tag_s3.h"
+#include "rgw_putobj_processor.h"
+#include "rgw_putobj_throttle.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/rgw/cls_rgw_client.h"
 
@@ -3536,9 +3539,6 @@ static CompressorRef get_compressor_plugin(const req_state *s,
 
 void RGWPutObj::execute()
 {
-  std::unique_ptr<RGWPutObjProcessor> processor;
-  RGWPutObjDataProcessor *filter = nullptr;
-  std::unique_ptr<RGWPutObjDataProcessor> encrypt;
   char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
   char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
@@ -3546,14 +3546,9 @@ void RGWPutObj::execute()
   MD5 hash;
   bufferlist bl, aclbl, bs;
   int len;
-  bool multipart;
   
   off_t fst;
   off_t lst;
-  const auto& compression_type = store->get_zone_params().get_compression_type(
-      s->bucket_info.placement_rule);
-  CompressorRef plugin;
-  boost::optional<RGWPutObj_Compress> compressor;
 
   bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
   perfcounter->inc(l_rgw_put);
@@ -3616,15 +3611,12 @@ void RGWPutObj::execute()
     supplied_md5[sizeof(supplied_md5) - 1] = '\0';
   }
 
+  const bool multipart = !multipart_upload_id.empty();
   auto& obj_ctx = *static_cast<RGWObjectCtx*>(s->obj_ctx);
-  processor.reset(select_processor(obj_ctx, &multipart));
-
-  // no filters by default
-  filter = processor.get();
+  rgw_obj obj{s->bucket, s->object};
 
   /* Handle object versioning of Swift API. */
   if (! multipart) {
-    rgw_obj obj(s->bucket, s->object);
     op_ret = store->swift_versioning_copy(obj_ctx,
                                           s->bucket_owner.get_id(),
                                           s->bucket_info,
@@ -3634,7 +3626,32 @@ void RGWPutObj::execute()
     }
   }
 
-  op_ret = processor->prepare(store, NULL);
+  // create the object processor
+  using namespace rgw::putobj;
+  AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+  constexpr auto max_processor_size = std::max(sizeof(MultipartObjectProcessor),
+                                               sizeof(AtomicObjectProcessor));
+  ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
+
+  if (multipart) {
+    processor.emplace<MultipartObjectProcessor>(
+        &aio, store, s->bucket_info, s->owner.get_id(), obj_ctx, obj,
+        multipart_upload_id, multipart_part_num, multipart_part_str);
+  } else {
+    if (s->bucket_info.versioning_enabled()) {
+      if (!version_id.empty()) {
+        obj.key.set_instance(version_id);
+      } else {
+        store->gen_rand_obj_instance_name(&obj);
+        version_id = obj.key.instance;
+      }
+    }
+    processor.emplace<AtomicObjectProcessor>(
+        &aio, store, s->bucket_info, s->bucket_owner.get_id(),
+        obj_ctx, obj, olh_epoch, s->req_id);
+  }
+
+  op_ret = processor->prepare();
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "processor->prepare() returned ret=" << op_ret
                      << dendl;
@@ -3662,27 +3679,32 @@ void RGWPutObj::execute()
   }
 
   fst = copy_source_range_fst;
-#if 0
+
+  // no filters by default
+  DataProcessor *filter = processor.get();
+
+  const auto& compression_type = store->get_zone_params().get_compression_type(
+      s->bucket_info.placement_rule);
+  CompressorRef plugin;
+  boost::optional<RGWPutObj_Compress> compressor;
+
+  std::unique_ptr<DataProcessor> encrypt;
   op_ret = get_encrypt_filter(&encrypt, filter);
   if (op_ret < 0) {
     return;
   }
   if (encrypt != nullptr) {
-    filter = encrypt.get();
-  } else {
-    //no encryption, we can try compression
-    if (compression_type != "none") {
-      plugin = get_compressor_plugin(s, compression_type);
-      if (!plugin) {
-        ldpp_dout(this, 1) << "Cannot load plugin for compression type "
-            << compression_type << dendl;
-      } else {
-        compressor.emplace(s->cct, plugin, filter);
-        filter = &*compressor;
-      }
+    filter = &*encrypt;
+  } else if (compression_type != "none") {
+    plugin = get_compressor_plugin(s, compression_type);
+    if (!plugin) {
+      ldpp_dout(this, 1) << "Cannot load plugin for compression type "
+          << compression_type << dendl;
+    } else {
+      compressor.emplace(s->cct, plugin, filter);
+      filter = &*compressor;
     }
   }
-#endif
   tracepoint(rgw_op, before_data_transfer, s->req_id.c_str());
   do {
     bufferlist data;
@@ -3703,6 +3725,8 @@ void RGWPutObj::execute()
       op_ret = len;
       ldpp_dout(this, 20) << "get_data() returned ret=" << op_ret << dendl;
       return;
+    } else if (len == 0) {
+      break;
     }
 
     if (need_calc_md5) {
@@ -3712,75 +3736,21 @@ void RGWPutObj::execute()
     /* update torrrent */
     torrent.update(data);
 
-    /* do we need this operation to be synchronous? if we're dealing with an object with immutable
-     * head, e.g., multipart object we need to make sure we're the first one writing to this object
-     */
-    bool need_to_wait = (ofs == 0) && multipart;
-
-    bufferlist orig_data;
-
-    if (need_to_wait) {
-      orig_data = data;
-    }
-
-    op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
+    op_ret = filter->process(std::move(data), ofs);
     if (op_ret < 0) {
-      if (op_ret != -EEXIST) {
-        ldpp_dout(this, 20) << "processor->thottle_data() returned ret="
-                         << op_ret << dendl;
-        return;
-      }
-      /* need_to_wait == true and op_ret == -EEXIST */
-      ldpp_dout(this, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
-
-      /* restore original data */
-      data.swap(orig_data);
-
-      /* restart processing with different oid suffix */
-      processor.reset(select_processor(obj_ctx, &multipart));
-      filter = processor.get();
-
-      string oid_rand;
-      char buf[33];
-      gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
-      oid_rand.append(buf);
-
-      op_ret = processor->prepare(store, &oid_rand);
-      if (op_ret < 0) {
-        ldpp_dout(this, 0) << "ERROR: processor->prepare() returned "
-                        << op_ret << dendl;
-        return;
-      }
-#if 0
-      op_ret = get_encrypt_filter(&encrypt, filter);
-      if (op_ret < 0) {
-        return;
-      }
-      if (encrypt != nullptr) {
-        filter = encrypt.get();
-      } else {
-        if (compressor) {
-          compressor.emplace(s->cct, plugin, filter);
-          filter = &*compressor;
-        }
-      }
-#endif
-      op_ret = put_data_and_throttle(filter, data, ofs, false);
-      if (op_ret < 0) {
-        return;
-      }
+      ldpp_dout(this, 20) << "processor->process() returned ret="
+          << op_ret << dendl;
+      return;
     }
 
     ofs += len;
   } while (len > 0);
   tracepoint(rgw_op, after_data_transfer, s->req_id.c_str(), ofs);
 
-  {
-    bufferlist flush;
-    op_ret = put_data_and_throttle(filter, flush, ofs, false);
-    if (op_ret < 0) {
-      return;
-    }
+  // flush any data in filters
+  op_ret = filter->process({}, ofs);
+  if (op_ret < 0) {
+    return;
   }
 
   if (!chunked_upload && ofs != s->content_length) {
@@ -3884,13 +3854,9 @@ void RGWPutObj::execute()
   tracepoint(rgw_op, processor_complete_enter, s->req_id.c_str());
   op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
                                (delete_at ? *delete_at : real_time()), if_match, if_nomatch,
-                               (user_data.empty() ? nullptr : &user_data));
+                               (user_data.empty() ? nullptr : &user_data), nullptr, nullptr);
   tracepoint(rgw_op, processor_complete_exit, s->req_id.c_str());
 
-  // only atomic upload will upate version_id here
-  if (!multipart) 
-    version_id = (static_cast<RGWPutObjProcessor_Atomic *>(processor.get()))->get_version_id();
-
   /* produce torrent */
   if (s->cct->_conf->rgw_torrent_flag && (ofs == torrent.get_data_len()))
   {
index 693f82325e23a2b6189e2ed164115e107b0a405b..00d1978f28cfa0bdd88ecd81a9c9e1cb1b13cf91 100644 (file)
@@ -37,6 +37,7 @@
 #include "rgw_acl.h"
 #include "rgw_cors.h"
 #include "rgw_quota.h"
+#include "rgw_putobj.h"
 
 #include "rgw_lc.h"
 #include "rgw_torrent.h"
@@ -1074,9 +1075,9 @@ public:
     *filter = nullptr;
     return 0;
   }
-  virtual int get_encrypt_filter(std::unique_ptr<RGWPutObjDataProcessor> *filter, RGWPutObjDataProcessor* cb) {
-     *filter = nullptr;
-     return 0;
+  virtual int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+                                 rgw::putobj::DataProcessor *cb) {
+    return 0;
   }
 
   int get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len);
@@ -1147,8 +1148,8 @@ public:
   void pre_exec() override;
   void execute() override;
 
-  virtual int get_encrypt_filter(std::unique_ptr<RGWPutObjDataProcessor> *filter, RGWPutObjDataProcessor* cb) {
-    *filter = nullptr;
+  virtual int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+                                 rgw::putobj::DataProcessor *cb) {
     return 0;
   }
   virtual int get_params() = 0;
index e5fa7b09265d45ebab7568f3c27f91cfd2dec554..1aa5be329c915c8f3b080a2ca807eb4d4451373d 100644 (file)
@@ -1538,34 +1538,24 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter(
 }
 
 int RGWPutObj_ObjStore_S3::get_encrypt_filter(
-    std::unique_ptr<RGWPutObjDataProcessor>* filter,
-    RGWPutObjDataProcessor* cb)
+    std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+    rgw::putobj::DataProcessor *cb)
 {
   int res = 0;
-  RGWPutObjProcessor_Multipart* multi_processor=dynamic_cast<RGWPutObjProcessor_Multipart*>(cb);
-  if (multi_processor != nullptr) {
-    RGWMPObj* mp = nullptr;
-    multi_processor->get_mp(&mp);
-    if (mp != nullptr) {
-      map<string, bufferlist> xattrs;
-      string meta_oid;
-      meta_oid = mp->get_meta();
-
-      rgw_obj obj;
-      obj.init_ns(s->bucket, meta_oid, RGW_OBJ_NS_MULTIPART);
-      obj.set_in_extra_data(true);
-      res = get_obj_attrs(store, s, obj, xattrs);
-      if (res == 0) {
-        std::unique_ptr<BlockCrypt> block_crypt;
-        /* We are adding to existing object.
-         * We use crypto mode that configured as if we were decrypting. */
-        res = rgw_s3_prepare_decrypt(s, xattrs, &block_crypt, crypt_http_responses);
-#if 0
-        if (res == 0 && block_crypt != nullptr)
-          *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
-              new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
-#endif
-      }
+  if (!multipart_upload_id.empty()) {
+    RGWMPObj mp(s->object.name, multipart_upload_id);
+    rgw_obj obj;
+    obj.init_ns(s->bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART);
+    obj.set_in_extra_data(true);
+    map<string, bufferlist> xattrs;
+    res = get_obj_attrs(store, s, obj, xattrs);
+    if (res == 0) {
+      std::unique_ptr<BlockCrypt> block_crypt;
+      /* We are adding to existing object.
+       * We use crypto mode that configured as if we were decrypting. */
+      res = rgw_s3_prepare_decrypt(s, xattrs, &block_crypt, crypt_http_responses);
+      if (res == 0 && block_crypt != nullptr)
+        filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
     }
     /* it is ok, to not have encryption at all */
   }
@@ -1573,12 +1563,9 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter(
   {
     std::unique_ptr<BlockCrypt> block_crypt;
     res = rgw_s3_prepare_encrypt(s, attrs, nullptr, &block_crypt, crypt_http_responses);
-#if 0
     if (res == 0 && block_crypt != nullptr) {
-      *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
-          new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
+      filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
     }
-#endif
   }
   return res;
 }
@@ -2084,19 +2071,15 @@ done:
 }
 
 int RGWPostObj_ObjStore_S3::get_encrypt_filter(
-    std::unique_ptr<RGWPutObjDataProcessor>* filter, RGWPutObjDataProcessor* cb)
+    std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+    rgw::putobj::DataProcessor *cb)
 {
-  int res = 0;
   std::unique_ptr<BlockCrypt> block_crypt;
-  res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt, crypt_http_responses);
-#if 0
+  int res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt,
+                                   crypt_http_responses);
   if (res == 0 && block_crypt != nullptr) {
-    *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
-        new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
+    filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
   }
-  else
-    *filter = nullptr;
-#endif
   return res;
 }
 
index 7149dcb3765ba041dec18658828c01d7cc178a1f..1decd9bf59ac617316dfe0e6e49a5840683cc3a5 100644 (file)
@@ -214,8 +214,8 @@ public:
   int get_data(bufferlist& bl) override;
   void send_response() override;
 
-  int get_encrypt_filter(std::unique_ptr<RGWPutObjDataProcessor>* filter,
-                         RGWPutObjDataProcessor* cb) override;
+  int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+                         rgw::putobj::DataProcessor *cb) override;
   int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
                          RGWGetObj_Filter* cb,
                          map<string, bufferlist>& attrs,
@@ -253,8 +253,8 @@ public:
 
   void send_response() override;
   int get_data(ceph::bufferlist& bl, bool& again) override;
-  int get_encrypt_filter(std::unique_ptr<RGWPutObjDataProcessor>* filter,
-                         RGWPutObjDataProcessor* cb) override;
+  int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+                         rgw::putobj::DataProcessor *cb) override;
 };
 
 class RGWDeleteObj_ObjStore_S3 : public RGWDeleteObj_ObjStore {