From: Casey Bodley Date: Wed, 10 Oct 2018 19:54:18 +0000 (-0400) Subject: rgw: PutObj::execute() uses new ObjectProcessors X-Git-Tag: v14.1.0~1156^2~9 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=47b1754e474241a6c0ed3519ab5e3a041c9559fc;p=ceph.git rgw: PutObj::execute() uses new ObjectProcessors Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index fc913222f22cd..28c816e9d491b 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -21,6 +21,7 @@ #include "common/mime.h" #include "common/utf8.h" #include "common/ceph_json.h" +#include "common/static_ptr.h" #include "rgw_rados.h" #include "rgw_op.h" @@ -42,6 +43,8 @@ #include "rgw_compression.h" #include "rgw_role.h" #include "rgw_tag_s3.h" +#include "rgw_putobj_processor.h" +#include "rgw_putobj_throttle.h" #include "cls/lock/cls_lock_client.h" #include "cls/rgw/cls_rgw_client.h" @@ -3536,9 +3539,6 @@ static CompressorRef get_compressor_plugin(const req_state *s, void RGWPutObj::execute() { - std::unique_ptr processor; - RGWPutObjDataProcessor *filter = nullptr; - std::unique_ptr encrypt; char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1]; char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; @@ -3546,14 +3546,9 @@ void RGWPutObj::execute() MD5 hash; bufferlist bl, aclbl, bs; int len; - bool multipart; off_t fst; off_t lst; - const auto& compression_type = store->get_zone_params().get_compression_type( - s->bucket_info.placement_rule); - CompressorRef plugin; - boost::optional compressor; bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL); perfcounter->inc(l_rgw_put); @@ -3616,15 +3611,12 @@ void RGWPutObj::execute() supplied_md5[sizeof(supplied_md5) - 1] = '\0'; } + const bool multipart = !multipart_upload_id.empty(); auto& obj_ctx = *static_cast(s->obj_ctx); - processor.reset(select_processor(obj_ctx, &multipart)); - - // no filters by default - filter = processor.get(); + rgw_obj obj{s->bucket, s->object}; /* Handle object versioning of Swift API. */ if (! multipart) { - rgw_obj obj(s->bucket, s->object); op_ret = store->swift_versioning_copy(obj_ctx, s->bucket_owner.get_id(), s->bucket_info, @@ -3634,7 +3626,32 @@ void RGWPutObj::execute() } } - op_ret = processor->prepare(store, NULL); + // create the object processor + using namespace rgw::putobj; + AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); + constexpr auto max_processor_size = std::max(sizeof(MultipartObjectProcessor), + sizeof(AtomicObjectProcessor)); + ceph::static_ptr processor; + + if (multipart) { + processor.emplace( + &aio, store, s->bucket_info, s->owner.get_id(), obj_ctx, obj, + multipart_upload_id, multipart_part_num, multipart_part_str); + } else { + if (s->bucket_info.versioning_enabled()) { + if (!version_id.empty()) { + obj.key.set_instance(version_id); + } else { + store->gen_rand_obj_instance_name(&obj); + version_id = obj.key.instance; + } + } + processor.emplace( + &aio, store, s->bucket_info, s->bucket_owner.get_id(), + obj_ctx, obj, olh_epoch, s->req_id); + } + + op_ret = processor->prepare(); if (op_ret < 0) { ldpp_dout(this, 20) << "processor->prepare() returned ret=" << op_ret << dendl; @@ -3662,27 +3679,32 @@ void RGWPutObj::execute() } fst = copy_source_range_fst; -#if 0 + + // no filters by default + DataProcessor *filter = processor.get(); + + const auto& compression_type = store->get_zone_params().get_compression_type( + s->bucket_info.placement_rule); + CompressorRef plugin; + boost::optional compressor; + + std::unique_ptr encrypt; op_ret = get_encrypt_filter(&encrypt, filter); if (op_ret < 0) { return; } if (encrypt != nullptr) { - filter = encrypt.get(); - } else { - //no encryption, we can try compression - if (compression_type != "none") { - plugin = get_compressor_plugin(s, compression_type); - if (!plugin) { - ldpp_dout(this, 1) << "Cannot load plugin for compression type " - << compression_type << dendl; - } else { - compressor.emplace(s->cct, plugin, filter); - filter = &*compressor; - } + filter = &*encrypt; + } else if (compression_type != "none") { + plugin = get_compressor_plugin(s, compression_type); + if (!plugin) { + ldpp_dout(this, 1) << "Cannot load plugin for compression type " + << compression_type << dendl; + } else { + compressor.emplace(s->cct, plugin, filter); + filter = &*compressor; } } -#endif tracepoint(rgw_op, before_data_transfer, s->req_id.c_str()); do { bufferlist data; @@ -3703,6 +3725,8 @@ void RGWPutObj::execute() op_ret = len; ldpp_dout(this, 20) << "get_data() returned ret=" << op_ret << dendl; return; + } else if (len == 0) { + break; } if (need_calc_md5) { @@ -3712,75 +3736,21 @@ void RGWPutObj::execute() /* update torrrent */ torrent.update(data); - /* do we need this operation to be synchronous? if we're dealing with an object with immutable - * head, e.g., multipart object we need to make sure we're the first one writing to this object - */ - bool need_to_wait = (ofs == 0) && multipart; - - bufferlist orig_data; - - if (need_to_wait) { - orig_data = data; - } - - op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait); + op_ret = filter->process(std::move(data), ofs); if (op_ret < 0) { - if (op_ret != -EEXIST) { - ldpp_dout(this, 20) << "processor->thottle_data() returned ret=" - << op_ret << dendl; - return; - } - /* need_to_wait == true and op_ret == -EEXIST */ - ldpp_dout(this, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl; - - /* restore original data */ - data.swap(orig_data); - - /* restart processing with different oid suffix */ - processor.reset(select_processor(obj_ctx, &multipart)); - filter = processor.get(); - - string oid_rand; - char buf[33]; - gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); - oid_rand.append(buf); - - op_ret = processor->prepare(store, &oid_rand); - if (op_ret < 0) { - ldpp_dout(this, 0) << "ERROR: processor->prepare() returned " - << op_ret << dendl; - return; - } -#if 0 - op_ret = get_encrypt_filter(&encrypt, filter); - if (op_ret < 0) { - return; - } - if (encrypt != nullptr) { - filter = encrypt.get(); - } else { - if (compressor) { - compressor.emplace(s->cct, plugin, filter); - filter = &*compressor; - } - } -#endif - op_ret = put_data_and_throttle(filter, data, ofs, false); - if (op_ret < 0) { - return; - } + ldpp_dout(this, 20) << "processor->process() returned ret=" + << op_ret << dendl; + return; } ofs += len; } while (len > 0); tracepoint(rgw_op, after_data_transfer, s->req_id.c_str(), ofs); - { - bufferlist flush; - op_ret = put_data_and_throttle(filter, flush, ofs, false); - if (op_ret < 0) { - return; - } + // flush any data in filters + op_ret = filter->process({}, ofs); + if (op_ret < 0) { + return; } if (!chunked_upload && ofs != s->content_length) { @@ -3884,13 +3854,9 @@ void RGWPutObj::execute() tracepoint(rgw_op, processor_complete_enter, s->req_id.c_str()); op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs, (delete_at ? *delete_at : real_time()), if_match, if_nomatch, - (user_data.empty() ? nullptr : &user_data)); + (user_data.empty() ? nullptr : &user_data), nullptr, nullptr); tracepoint(rgw_op, processor_complete_exit, s->req_id.c_str()); - // only atomic upload will upate version_id here - if (!multipart) - version_id = (static_cast(processor.get()))->get_version_id(); - /* produce torrent */ if (s->cct->_conf->rgw_torrent_flag && (ofs == torrent.get_data_len())) { diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 693f82325e23a..00d1978f28cfa 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -37,6 +37,7 @@ #include "rgw_acl.h" #include "rgw_cors.h" #include "rgw_quota.h" +#include "rgw_putobj.h" #include "rgw_lc.h" #include "rgw_torrent.h" @@ -1074,9 +1075,9 @@ public: *filter = nullptr; return 0; } - virtual int get_encrypt_filter(std::unique_ptr *filter, RGWPutObjDataProcessor* cb) { - *filter = nullptr; - return 0; + virtual int get_encrypt_filter(std::unique_ptr *filter, + rgw::putobj::DataProcessor *cb) { + return 0; } int get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len); @@ -1147,8 +1148,8 @@ public: void pre_exec() override; void execute() override; - virtual int get_encrypt_filter(std::unique_ptr *filter, RGWPutObjDataProcessor* cb) { - *filter = nullptr; + virtual int get_encrypt_filter(std::unique_ptr *filter, + rgw::putobj::DataProcessor *cb) { return 0; } virtual int get_params() = 0; diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index e5fa7b09265d4..1aa5be329c915 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -1538,34 +1538,24 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter( } int RGWPutObj_ObjStore_S3::get_encrypt_filter( - std::unique_ptr* filter, - RGWPutObjDataProcessor* cb) + std::unique_ptr *filter, + rgw::putobj::DataProcessor *cb) { int res = 0; - RGWPutObjProcessor_Multipart* multi_processor=dynamic_cast(cb); - if (multi_processor != nullptr) { - RGWMPObj* mp = nullptr; - multi_processor->get_mp(&mp); - if (mp != nullptr) { - map xattrs; - string meta_oid; - meta_oid = mp->get_meta(); - - rgw_obj obj; - obj.init_ns(s->bucket, meta_oid, RGW_OBJ_NS_MULTIPART); - obj.set_in_extra_data(true); - res = get_obj_attrs(store, s, obj, xattrs); - if (res == 0) { - std::unique_ptr block_crypt; - /* We are adding to existing object. - * We use crypto mode that configured as if we were decrypting. */ - res = rgw_s3_prepare_decrypt(s, xattrs, &block_crypt, crypt_http_responses); -#if 0 - if (res == 0 && block_crypt != nullptr) - *filter = std::unique_ptr( - new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt))); -#endif - } + if (!multipart_upload_id.empty()) { + RGWMPObj mp(s->object.name, multipart_upload_id); + rgw_obj obj; + obj.init_ns(s->bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART); + obj.set_in_extra_data(true); + map xattrs; + res = get_obj_attrs(store, s, obj, xattrs); + if (res == 0) { + std::unique_ptr block_crypt; + /* We are adding to existing object. + * We use crypto mode that configured as if we were decrypting. */ + res = rgw_s3_prepare_decrypt(s, xattrs, &block_crypt, crypt_http_responses); + if (res == 0 && block_crypt != nullptr) + filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt))); } /* it is ok, to not have encryption at all */ } @@ -1573,12 +1563,9 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter( { std::unique_ptr block_crypt; res = rgw_s3_prepare_encrypt(s, attrs, nullptr, &block_crypt, crypt_http_responses); -#if 0 if (res == 0 && block_crypt != nullptr) { - *filter = std::unique_ptr( - new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt))); + filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt))); } -#endif } return res; } @@ -2084,19 +2071,15 @@ done: } int RGWPostObj_ObjStore_S3::get_encrypt_filter( - std::unique_ptr* filter, RGWPutObjDataProcessor* cb) + std::unique_ptr *filter, + rgw::putobj::DataProcessor *cb) { - int res = 0; std::unique_ptr block_crypt; - res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt, crypt_http_responses); -#if 0 + int res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt, + crypt_http_responses); if (res == 0 && block_crypt != nullptr) { - *filter = std::unique_ptr( - new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt))); + filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt))); } - else - *filter = nullptr; -#endif return res; } diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index 7149dcb3765ba..1decd9bf59ac6 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -214,8 +214,8 @@ public: int get_data(bufferlist& bl) override; void send_response() override; - int get_encrypt_filter(std::unique_ptr* filter, - RGWPutObjDataProcessor* cb) override; + int get_encrypt_filter(std::unique_ptr *filter, + rgw::putobj::DataProcessor *cb) override; int get_decrypt_filter(std::unique_ptr* filter, RGWGetObj_Filter* cb, map& attrs, @@ -253,8 +253,8 @@ public: void send_response() override; int get_data(ceph::bufferlist& bl, bool& again) override; - int get_encrypt_filter(std::unique_ptr* filter, - RGWPutObjDataProcessor* cb) override; + int get_encrypt_filter(std::unique_ptr *filter, + rgw::putobj::DataProcessor *cb) override; }; class RGWDeleteObj_ObjStore_S3 : public RGWDeleteObj_ObjStore {