From 8e030b35f1fb26fff69b6b4b0756829ad1c58d03 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 10 Oct 2018 15:54:32 -0400 Subject: [PATCH] rgw: RGWBulkUploadOp uses AtomicObjectProcessor Signed-off-by: Casey Bodley --- src/rgw/rgw_op.cc | 47 +++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index ec784713f27..dacc324e872 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -6688,9 +6688,6 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, ldpp_dout(this, 20) << "got file=" << path << ", size=" << size << dendl; - RGWPutObjDataProcessor *filter = nullptr; - boost::optional compressor; - if (size > static_cast(s->cct->_conf->rgw_max_put_size)) { op_ret = -ERR_TOO_LARGE; return op_ret; @@ -6731,28 +6728,30 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, return op_ret; } - RGWPutObjProcessor_Atomic processor(obj_ctx, - binfo, - binfo.bucket, - object.name, - /* part size */ - s->cct->_conf->rgw_obj_stripe_size, - s->req_id, - binfo.versioning_enabled()); + rgw_obj obj(binfo.bucket, object); + if (s->bucket_info.versioning_enabled()) { + store->gen_rand_obj_instance_name(&obj); + } - /* No filters by default. */ - filter = &processor; + using namespace rgw::putobj; + AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); + + AtomicObjectProcessor processor(&aio, store, binfo, bowner.get_id(), + obj_ctx, obj, 0, s->req_id); - op_ret = processor.prepare(store, nullptr); + op_ret = processor.prepare(); if (op_ret < 0) { ldpp_dout(this, 20) << "cannot prepare processor due to ret=" << op_ret << dendl; return op_ret; } + /* No filters by default. */ + DataProcessor *filter = &processor; + const auto& compression_type = store->get_zone_params().get_compression_type( binfo.placement_rule); CompressorRef plugin; -#if 0 + boost::optional compressor; if (compression_type != "none") { plugin = Compressor::create(s->cct, compression_type); if (! plugin) { @@ -6763,7 +6762,7 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, filter = &*compressor; } } -#endif + /* Upload file content. */ ssize_t len = 0; size_t ofs = 0; @@ -6778,9 +6777,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, return op_ret; } else if (len > 0) { hash.Update((const unsigned char *)data.c_str(), data.length()); - op_ret = put_data_and_throttle(filter, data, ofs, false); + op_ret = filter->process(std::move(data), ofs); if (op_ret < 0) { - ldpp_dout(this, 20) << "processor->thottle_data() returned ret=" << op_ret << dendl; + ldpp_dout(this, 20) << "filter->process() returned ret=" << op_ret << dendl; return op_ret; } @@ -6789,9 +6788,16 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, } while (len > 0); + // flush + op_ret = filter->process({}, ofs); + if (op_ret < 0) { + return op_ret; + } + if (ofs != size) { ldpp_dout(this, 10) << "real file size different from declared" << dendl; op_ret = -EINVAL; + return op_ret; } op_ret = store->check_quota(bowner.get_id(), binfo.bucket, @@ -6837,8 +6843,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, } /* Complete the transaction. */ - op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), attrs, - ceph::real_time() /* delete_at */); + op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), + attrs, ceph::real_time() /* delete_at */, + nullptr, nullptr, nullptr, nullptr, nullptr); if (op_ret < 0) { ldpp_dout(this, 20) << "processor::complete returned op_ret=" << op_ret << dendl; } -- 2.39.5