From: Yehuda Sadeh Date: Fri, 19 Oct 2018 00:18:37 +0000 (-0700) Subject: rgw: api adjustment following rebase X-Git-Tag: v14.1.0~616^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bee8ca9dbd3f8ec669d8d180f51b9487fa537688;p=ceph.git rgw: api adjustment following rebase Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index b1794d22d14f..eab7fe7559f8 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -46,6 +46,7 @@ #include "rgw_role.h" #include "rgw_tag_s3.h" #include "rgw_putobj_processor.h" +#include "rgw_crypt.h" #include "services/svc_zone.h" #include "services/svc_quota.h" diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 8b7b56429498..ac6b744e4ced 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -15,6 +15,9 @@ #include "rgw_tools.h" #include "rgw_acl_s3.h" #include "rgw_op.h" +#include "rgw_putobj_processor.h" +#include "rgw_aio_throttle.h" +#include "rgw_compression.h" #include "services/svc_sys_obj.h" @@ -335,23 +338,43 @@ int RGWDataAccess::Object::put(bufferlist& data, RGWBucketInfo& bucket_info = bucket->bucket_info; - RGWPutObjProcessor_Atomic processor(*sd->obj_ctx, - bucket_info, - bucket_info.bucket, - key.name, - cct->_conf->rgw_obj_stripe_size, tag, - bucket_info.versioning_enabled()); - if (key.instance.empty()) { - processor.set_version_id(key.instance); - } + using namespace rgw::putobj; + rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); - if (olh_epoch) { - processor.set_olh_epoch(*olh_epoch); - } - int ret = processor.prepare(store, NULL); + RGWObjectCtx obj_ctx(store); + rgw_obj obj(bucket_info.bucket, key); + + auto& owner = bucket->policy.get_owner(); + + string req_id = store->unique_id(store->get_new_req_id()); + + AtomicObjectProcessor processor(&aio, store, bucket_info, + owner.get_id(), + obj_ctx, obj, olh_epoch, req_id); + + int ret = processor.prepare(); if (ret < 0) return ret; + using namespace rgw::putobj; + + DataProcessor *filter = &processor; + + CompressorRef plugin; + boost::optional compressor; + + const auto& compression_type = store->get_zone_params().get_compression_type(bucket_info.placement_rule); + if (compression_type != "none") { + plugin = Compressor::create(store->ctx(), compression_type); + if (!plugin) { + ldout(store->ctx(), 1) << "Cannot load plugin for compression type " + << compression_type << dendl; + } else { + compressor.emplace(store->ctx(), plugin, filter); + filter = &*compressor; + } + } + off_t ofs = 0; auto obj_size = data.length(); @@ -365,24 +388,17 @@ int RGWDataAccess::Object::put(bufferlist& data, data.splice(0, read_len, &bl); etag_calc.update(bl); - bool again; - - do { - void *handle; - rgw_raw_obj obj; - - ret = processor.handle_data(bl, ofs, &handle, &obj, &again); - if (ret < 0) { - return ret; - } - ret = processor.throttle_data(handle, obj, read_len, false); - if (ret < 0) - return ret; - } while (again); + ret = filter->process(std::move(bl), ofs); + if (ret < 0) + return ret; ofs += read_len; } while (data.length() > 0); + ret = filter->process({}, ofs); + if (ret < 0) { + return ret; + } bool has_etag_attr = false; auto iter = attrs.find(RGW_ATTR_ETAG); if (iter != attrs.end()) { @@ -419,7 +435,8 @@ int RGWDataAccess::Object::put(bufferlist& data, &mtime, mtime, attrs, delete_at, nullptr, nullptr, - puser_data); + puser_data, + nullptr, nullptr); } void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)