perfcounter->inc(l_rgw_put);
op_ret = -EINVAL;
+ rgw_obj obj{s->bucket, s->object};
if (s->object.empty()) {
ldout(s->cct, 0) << __func__ << " called on empty object" << dendl;
/* early quota check skipped--we don't have size yet */
/* skipping user-supplied etag--we might have one in future, but
* like data it and other attrs would arrive after open */
- processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
- &multipart);
- op_ret = processor->prepare(get_store(), NULL);
+
+ aio.emplace(s->cct->_conf->rgw_put_obj_min_window_size);
+
+ if (s->bucket_info.versioning_enabled()) {
+ if (!version_id.empty()) {
+ obj.key.set_instance(version_id);
+ } else {
+ get_store()->gen_rand_obj_instance_name(&obj);
+ version_id = obj.key.instance;
+ }
+ }
+ processor.emplace(&*aio, get_store(), s->bucket_info,
+ s->bucket_owner.get_id(),
+ *static_cast<RGWObjectCtx *>(s->obj_ctx),
+ obj, olh_epoch, s->req_id);
+
+ op_ret = processor->prepare();
if (op_ret < 0) {
ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
<< dendl;
goto done;
}
-
- filter = processor;
-#if 0
+ filter = &*processor;
if (compression_type != "none") {
plugin = Compressor::create(s->cct, compression_type);
if (! plugin) {
filter = &*compressor;
}
}
-#endif
done:
return op_ret;
if (! len)
return 0;
- /* XXX we are currently synchronous--supplied data buffers cannot
- * be used after the caller returns */
- bool need_to_wait = true;
- bufferlist orig_data;
-
- if (need_to_wait) {
- orig_data = data;
- }
hash.Update((const unsigned char *)data.c_str(), data.length());
- op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
+ op_ret = filter->process(std::move(data), ofs);
if (op_ret < 0) {
- if (!need_to_wait || op_ret != -EEXIST) {
- ldout(s->cct, 20) << "processor->thottle_data() returned ret="
- << op_ret << dendl;
- goto done;
- }
-
- ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
-
- /* restore original data */
- data.swap(orig_data);
-
- /* restart processing with different oid suffix */
- dispose_processor(processor);
- processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
- &multipart);
- filter = processor;
-
- string oid_rand;
- char buf[33];
- gen_rand_alphanumeric(get_store()->ctx(), buf, sizeof(buf) - 1);
- oid_rand.append(buf);
-
- op_ret = processor->prepare(get_store(), &oid_rand);
- if (op_ret < 0) {
- ldout(s->cct, 0) << "ERROR: processor->prepare() returned "
- << op_ret << dendl;
- goto done;
- }
-#if 0
- /* restore compression filter, if any */
- if (compressor) {
- compressor.emplace(s->cct, plugin, filter);
- filter = &*compressor;
- }
-#endif
- op_ret = put_data_and_throttle(filter, data, ofs, false);
- if (op_ret < 0) {
- goto done;
- }
+ goto done;
}
bytes_written += len;
s->obj_size = bytes_written;
perfcounter->inc(l_rgw_put_b, s->obj_size);
+ // flush data in filters
+ op_ret = filter->process({}, s->obj_size);
+ if (op_ret < 0) {
+ goto done;
+ }
+
op_ret = get_store()->check_quota(s->bucket_owner.get_id(), s->bucket,
user_quota, bucket_quota, s->obj_size);
if (op_ret < 0) {
op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
(delete_at ? *delete_at : real_time()),
- if_match, if_nomatch);
+ if_match, if_nomatch, nullptr, nullptr, nullptr);
if (op_ret != 0) {
/* revert attr updates */
rgw_fh->set_mtime(omtime);
}
done:
- dispose_processor(processor);
perfcounter->tinc(l_rgw_put_lat, s->time_elapsed());
return op_ret;
} /* exec_finish */
#include "rgw_lib.h"
#include "rgw_ldap.h"
#include "rgw_token.h"
+#include "rgw_putobj_processor.h"
+#include "rgw_putobj_throttle.h"
#include "rgw_compression.h"
const std::string& bucket_name;
const std::string& obj_name;
RGWFileHandle* rgw_fh;
- RGWPutObjProcessor* processor;
- RGWPutObjDataProcessor* filter;
+ std::optional<rgw::putobj::AioThrottle> aio;
+ std::optional<rgw::putobj::AtomicObjectProcessor> processor;
+ rgw::putobj::DataProcessor* filter;
boost::optional<RGWPutObj_Compress> compressor;
CompressorRef plugin;
buffer::list data;
MD5 hash;
off_t real_ofs;
size_t bytes_written;
- bool multipart;
bool eio;
RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh,
const std::string& _bname, const std::string& _oname)
: RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
- rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0),
- bytes_written(0), multipart(false), eio(false) {
+ rgw_fh(_fh), filter(nullptr), real_ofs(0),
+ bytes_written(0), eio(false) {
int ret = header_init();
if (ret == 0) {
return 0;
}
- RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx,
- bool *is_multipart) override {
- struct req_state* s = get_state();
- uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
- RGWPutObjProcessor_Atomic *processor =
- new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket,
- s->object.name, part_size, s->req_id,
- s->bucket_info.versioning_enabled());
- processor->set_olh_epoch(olh_epoch);
- processor->set_version_id(version_id);
- return processor;
- }
-
int get_params() override {
struct req_state* s = get_state();
RGWAccessControlPolicy_S3 s3policy(s->cct);