From dd472ab0cfd024f6923fdcd49b427dd7c0604a2a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 10 Oct 2018 15:54:22 -0400 Subject: [PATCH] rgw: rgw_file uses AtomicObjectProcessor Signed-off-by: Casey Bodley --- src/rgw/rgw_file.cc | 85 +++++++++++++++------------------------------ src/rgw/rgw_file.h | 25 ++++--------- 2 files changed, 35 insertions(+), 75 deletions(-) diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index ec20ff962ecc0..702f9504f6b80 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -1295,6 +1295,7 @@ namespace rgw { perfcounter->inc(l_rgw_put); op_ret = -EINVAL; + rgw_obj obj{s->bucket, s->object}; if (s->object.empty()) { ldout(s->cct, 0) << __func__ << " called on empty object" << dendl; @@ -1314,17 +1315,29 @@ namespace rgw { /* early quota check skipped--we don't have size yet */ /* skipping user-supplied etag--we might have one in future, but * like data it and other attrs would arrive after open */ - processor = select_processor(*static_cast(s->obj_ctx), - &multipart); - op_ret = processor->prepare(get_store(), NULL); + + aio.emplace(s->cct->_conf->rgw_put_obj_min_window_size); + + if (s->bucket_info.versioning_enabled()) { + if (!version_id.empty()) { + obj.key.set_instance(version_id); + } else { + get_store()->gen_rand_obj_instance_name(&obj); + version_id = obj.key.instance; + } + } + processor.emplace(&*aio, get_store(), s->bucket_info, + s->bucket_owner.get_id(), + *static_cast(s->obj_ctx), + obj, olh_epoch, s->req_id); + + op_ret = processor->prepare(); if (op_ret < 0) { ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret << dendl; goto done; } - - filter = processor; -#if 0 + filter = &*processor; if (compression_type != "none") { plugin = Compressor::create(s->cct, compression_type); if (! plugin) { @@ -1335,7 +1348,6 @@ namespace rgw { filter = &*compressor; } } -#endif done: return op_ret; @@ -1359,56 +1371,10 @@ namespace rgw { if (! len) return 0; - /* XXX we are currently synchronous--supplied data buffers cannot - * be used after the caller returns */ - bool need_to_wait = true; - bufferlist orig_data; - - if (need_to_wait) { - orig_data = data; - } hash.Update((const unsigned char *)data.c_str(), data.length()); - op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait); + op_ret = filter->process(std::move(data), ofs); if (op_ret < 0) { - if (!need_to_wait || op_ret != -EEXIST) { - ldout(s->cct, 20) << "processor->thottle_data() returned ret=" - << op_ret << dendl; - goto done; - } - - ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl; - - /* restore original data */ - data.swap(orig_data); - - /* restart processing with different oid suffix */ - dispose_processor(processor); - processor = select_processor(*static_cast(s->obj_ctx), - &multipart); - filter = processor; - - string oid_rand; - char buf[33]; - gen_rand_alphanumeric(get_store()->ctx(), buf, sizeof(buf) - 1); - oid_rand.append(buf); - - op_ret = processor->prepare(get_store(), &oid_rand); - if (op_ret < 0) { - ldout(s->cct, 0) << "ERROR: processor->prepare() returned " - << op_ret << dendl; - goto done; - } -#if 0 - /* restore compression filter, if any */ - if (compressor) { - compressor.emplace(s->cct, plugin, filter); - filter = &*compressor; - } -#endif - op_ret = put_data_and_throttle(filter, data, ofs, false); - if (op_ret < 0) { - goto done; - } + goto done; } bytes_written += len; @@ -1432,6 +1398,12 @@ namespace rgw { s->obj_size = bytes_written; perfcounter->inc(l_rgw_put_b, s->obj_size); + // flush data in filters + op_ret = filter->process({}, s->obj_size); + if (op_ret < 0) { + goto done; + } + op_ret = get_store()->check_quota(s->bucket_owner.get_id(), s->bucket, user_quota, bucket_quota, s->obj_size); if (op_ret < 0) { @@ -1503,7 +1475,7 @@ namespace rgw { op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs, (delete_at ? *delete_at : real_time()), - if_match, if_nomatch); + if_match, if_nomatch, nullptr, nullptr, nullptr); if (op_ret != 0) { /* revert attr updates */ rgw_fh->set_mtime(omtime); @@ -1512,7 +1484,6 @@ namespace rgw { } done: - dispose_processor(processor); perfcounter->tinc(l_rgw_put_lat, s->time_elapsed()); return op_ret; } /* exec_finish */ diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index 87bd33ac3443a..a98d2832e93f4 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -34,6 +34,8 @@ #include "rgw_lib.h" #include "rgw_ldap.h" #include "rgw_token.h" +#include "rgw_putobj_processor.h" +#include "rgw_putobj_throttle.h" #include "rgw_compression.h" @@ -2289,8 +2291,9 @@ public: const std::string& bucket_name; const std::string& obj_name; RGWFileHandle* rgw_fh; - RGWPutObjProcessor* processor; - RGWPutObjDataProcessor* filter; + std::optional aio; + std::optional processor; + rgw::putobj::DataProcessor* filter; boost::optional compressor; CompressorRef plugin; buffer::list data; @@ -2298,14 +2301,13 @@ public: MD5 hash; off_t real_ofs; size_t bytes_written; - bool multipart; bool eio; RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh, const std::string& _bname, const std::string& _oname) : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname), - rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0), - bytes_written(0), multipart(false), eio(false) { + rgw_fh(_fh), filter(nullptr), real_ofs(0), + bytes_written(0), eio(false) { int ret = header_init(); if (ret == 0) { @@ -2348,19 +2350,6 @@ public: return 0; } - RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, - bool *is_multipart) override { - struct req_state* s = get_state(); - uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size; - RGWPutObjProcessor_Atomic *processor = - new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, - s->object.name, part_size, s->req_id, - s->bucket_info.versioning_enabled()); - processor->set_olh_epoch(olh_epoch); - processor->set_version_id(version_id); - return processor; - } - int get_params() override { struct req_state* s = get_state(); RGWAccessControlPolicy_S3 s3policy(s->cct); -- 2.39.5