ldpp_dout(this, 20) << "got file=" << path << ", size=" << size << dendl;
- RGWPutObjDataProcessor *filter = nullptr;
- boost::optional<RGWPutObj_Compress> compressor;
-
if (size > static_cast<size_t>(s->cct->_conf->rgw_max_put_size)) {
op_ret = -ERR_TOO_LARGE;
return op_ret;
return op_ret;
}
- RGWPutObjProcessor_Atomic processor(obj_ctx,
- binfo,
- binfo.bucket,
- object.name,
- /* part size */
- s->cct->_conf->rgw_obj_stripe_size,
- s->req_id,
- binfo.versioning_enabled());
+ rgw_obj obj(binfo.bucket, object);
+ if (s->bucket_info.versioning_enabled()) {
+ store->gen_rand_obj_instance_name(&obj);
+ }
- /* No filters by default. */
- filter = &processor;
+ using namespace rgw::putobj;
+ AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+
+ AtomicObjectProcessor processor(&aio, store, binfo, bowner.get_id(),
+ obj_ctx, obj, 0, s->req_id);
- op_ret = processor.prepare(store, nullptr);
+ op_ret = processor.prepare();
if (op_ret < 0) {
ldpp_dout(this, 20) << "cannot prepare processor due to ret=" << op_ret << dendl;
return op_ret;
}
+ /* No filters by default. */
+ DataProcessor *filter = &processor;
+
const auto& compression_type = store->get_zone_params().get_compression_type(
binfo.placement_rule);
CompressorRef plugin;
-#if 0
+ boost::optional<RGWPutObj_Compress> compressor;
if (compression_type != "none") {
plugin = Compressor::create(s->cct, compression_type);
if (! plugin) {
filter = &*compressor;
}
}
-#endif
+
/* Upload file content. */
ssize_t len = 0;
size_t ofs = 0;
return op_ret;
} else if (len > 0) {
hash.Update((const unsigned char *)data.c_str(), data.length());
- op_ret = put_data_and_throttle(filter, data, ofs, false);
+ op_ret = filter->process(std::move(data), ofs);
if (op_ret < 0) {
- ldpp_dout(this, 20) << "processor->thottle_data() returned ret=" << op_ret << dendl;
+ ldpp_dout(this, 20) << "filter->process() returned ret=" << op_ret << dendl;
return op_ret;
}
} while (len > 0);
+ // flush
+ op_ret = filter->process({}, ofs);
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
if (ofs != size) {
ldpp_dout(this, 10) << "real file size different from declared" << dendl;
op_ret = -EINVAL;
+ return op_ret;
}
op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
}
/* Complete the transaction. */
- op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), attrs,
- ceph::real_time() /* delete_at */);
+ op_ret = processor.complete(size, etag, nullptr, ceph::real_time(),
+ attrs, ceph::real_time() /* delete_at */,
+ nullptr, nullptr, nullptr, nullptr, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 20) << "processor::complete returned op_ret=" << op_ret << dendl;
}