From: Casey Bodley Date: Wed, 10 Oct 2018 19:54:30 +0000 (-0400) Subject: rgw: PostObj uses AtomicObjectProcessor X-Git-Tag: v14.1.0~1156^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ee8bc41329b40067d87954ead8f81c2c98b23bd8;p=ceph.git rgw: PostObj uses AtomicObjectProcessor Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 28c816e9d491..ec784713f278 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3883,7 +3883,6 @@ void RGWPostObj::pre_exec() void RGWPostObj::execute() { - RGWPutObjDataProcessor *filter = nullptr; boost::optional compressor; CompressorRef plugin; char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; @@ -3926,7 +3925,6 @@ void RGWPostObj::execute() /* Start iteration over data fields. It's necessary as Swift's FormPost * is capable to handle multiple files in single form. */ do { - std::unique_ptr encrypt; char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; MD5 hash; @@ -3962,23 +3960,26 @@ void RGWPostObj::execute() ldpp_dout(this, 15) << "supplied_md5=" << supplied_md5 << dendl; } - RGWPutObjProcessor_Atomic processor(*static_cast(s->obj_ctx), - s->bucket_info, - s->bucket, - get_current_filename(), - /* part size */ - s->cct->_conf->rgw_obj_stripe_size, - s->req_id, - s->bucket_info.versioning_enabled()); - processor.set_olh_epoch(0); - /* No filters by default. */ - filter = &processor; + rgw_obj obj(s->bucket, get_current_filename()); + if (s->bucket_info.versioning_enabled()) { + store->gen_rand_obj_instance_name(&obj); + } - op_ret = processor.prepare(store, nullptr); + using namespace rgw::putobj; + AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size); + AtomicObjectProcessor processor(&aio, store, s->bucket_info, + s->bucket_owner.get_id(), + *static_cast(s->obj_ctx), + obj, 0, s->req_id); + op_ret = processor.prepare(); if (op_ret < 0) { return; } -#if 0 + + /* No filters by default. */ + DataProcessor *filter = &processor; + + std::unique_ptr encrypt; op_ret = get_encrypt_filter(&encrypt, filter); if (op_ret < 0) { return; @@ -3999,7 +4000,7 @@ void RGWPostObj::execute() } } } -#endif + bool again; do { ceph::bufferlist data; @@ -4015,7 +4016,7 @@ void RGWPostObj::execute() } hash.Update((const unsigned char *)data.c_str(), data.length()); - op_ret = put_data_and_throttle(filter, data, ofs, false); + op_ret = filter->process(std::move(data), ofs); ofs += len; @@ -4025,9 +4026,10 @@ void RGWPostObj::execute() } } while (again); - { - bufferlist flush; - op_ret = put_data_and_throttle(filter, flush, ofs, false); + // flush + op_ret = filter->process({}, ofs); + if (op_ret < 0) { + return; } if (len < min_len) { @@ -4082,8 +4084,12 @@ void RGWPostObj::execute() emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp)); } - op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), - attrs, (delete_at ? *delete_at : real_time())); + op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), attrs, + (delete_at ? *delete_at : real_time()), + nullptr, nullptr, nullptr, nullptr, nullptr); + if (op_ret < 0) { + return; + } } while (is_next_file_to_upload()); }