]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: api adjustment following rebase
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 19 Oct 2018 00:18:37 +0000 (17:18 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 09:00:21 +0000 (01:00 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_op.cc
src/rgw/rgw_tools.cc

index b1794d22d14fa6278f8d9c56dfbd9da60d34bcbb..eab7fe7559f8a19d9c1b1274f581f9d3cf0e6401 100644 (file)
@@ -46,6 +46,7 @@
 #include "rgw_role.h"
 #include "rgw_tag_s3.h"
 #include "rgw_putobj_processor.h"
+#include "rgw_crypt.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
index 8b7b56429498b2a5c70d51349e5ead7281e9f85e..ac6b744e4ced818f59500025361960a0bcfde6aa 100644 (file)
@@ -15,6 +15,9 @@
 #include "rgw_tools.h"
 #include "rgw_acl_s3.h"
 #include "rgw_op.h"
+#include "rgw_putobj_processor.h"
+#include "rgw_aio_throttle.h"
+#include "rgw_compression.h"
 
 #include "services/svc_sys_obj.h"
 
@@ -335,23 +338,43 @@ int RGWDataAccess::Object::put(bufferlist& data,
 
   RGWBucketInfo& bucket_info = bucket->bucket_info;
 
-  RGWPutObjProcessor_Atomic processor(*sd->obj_ctx,
-                                      bucket_info,
-                                     bucket_info.bucket,
-                                     key.name,
-                                      cct->_conf->rgw_obj_stripe_size, tag,
-                                     bucket_info.versioning_enabled());
-  if (key.instance.empty()) {
-    processor.set_version_id(key.instance);
-  }
+  using namespace rgw::putobj;
+  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
 
-  if (olh_epoch) {
-    processor.set_olh_epoch(*olh_epoch);
-  }
-  int ret = processor.prepare(store, NULL);
+  RGWObjectCtx obj_ctx(store);
+  rgw_obj obj(bucket_info.bucket, key);
+
+  auto& owner = bucket->policy.get_owner();
+
+  string req_id = store->unique_id(store->get_new_req_id());
+
+  AtomicObjectProcessor processor(&aio, store, bucket_info,
+                                  owner.get_id(),
+                                  obj_ctx, obj, olh_epoch, req_id);
+
+  int ret = processor.prepare();
   if (ret < 0)
     return ret;
 
+  using namespace rgw::putobj;
+
+  DataProcessor *filter = &processor;
+
+  CompressorRef plugin;
+  boost::optional<RGWPutObj_Compress> compressor;
+
+  const auto& compression_type = store->get_zone_params().get_compression_type(bucket_info.placement_rule);
+  if (compression_type != "none") {
+    plugin = Compressor::create(store->ctx(), compression_type);
+    if (!plugin) {
+      ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
+        << compression_type << dendl;
+    } else {
+      compressor.emplace(store->ctx(), plugin, filter);
+      filter = &*compressor;
+    }
+  }
+
   off_t ofs = 0;
   auto obj_size = data.length();
 
@@ -365,24 +388,17 @@ int RGWDataAccess::Object::put(bufferlist& data,
     data.splice(0, read_len, &bl);
     etag_calc.update(bl);
 
-    bool again;
-
-    do {
-      void *handle;
-      rgw_raw_obj obj;
-
-      ret = processor.handle_data(bl, ofs, &handle, &obj, &again);
-      if (ret < 0) {
-        return ret;
-      }
-      ret = processor.throttle_data(handle, obj, read_len, false);
-      if (ret < 0)
-        return ret;
-    } while (again);
+    ret = filter->process(std::move(bl), ofs);
+    if (ret < 0)
+      return ret;
 
     ofs += read_len;
   } while (data.length() > 0);
 
+  ret = filter->process({}, ofs);
+  if (ret < 0) {
+    return ret;
+  }
   bool has_etag_attr = false;
   auto iter = attrs.find(RGW_ATTR_ETAG);
   if (iter != attrs.end()) {
@@ -419,7 +435,8 @@ int RGWDataAccess::Object::put(bufferlist& data,
                            &mtime, mtime,
                            attrs, delete_at,
                             nullptr, nullptr,
-                            puser_data);
+                            puser_data,
+                            nullptr, nullptr);
 }
 
 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)