#include "rgw_tools.h"
#include "rgw_acl_s3.h"
#include "rgw_op.h"
+#include "rgw_putobj_processor.h"
+#include "rgw_aio_throttle.h"
+#include "rgw_compression.h"
#include "services/svc_sys_obj.h"
RGWBucketInfo& bucket_info = bucket->bucket_info;
- RGWPutObjProcessor_Atomic processor(*sd->obj_ctx,
- bucket_info,
- bucket_info.bucket,
- key.name,
- cct->_conf->rgw_obj_stripe_size, tag,
- bucket_info.versioning_enabled());
- if (key.instance.empty()) {
- processor.set_version_id(key.instance);
- }
+ using namespace rgw::putobj;
+ rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
- if (olh_epoch) {
- processor.set_olh_epoch(*olh_epoch);
- }
- int ret = processor.prepare(store, NULL);
+ RGWObjectCtx obj_ctx(store);
+ rgw_obj obj(bucket_info.bucket, key);
+
+ auto& owner = bucket->policy.get_owner();
+
+ string req_id = store->unique_id(store->get_new_req_id());
+
+ AtomicObjectProcessor processor(&aio, store, bucket_info,
+ owner.get_id(),
+ obj_ctx, obj, olh_epoch, req_id);
+
+ int ret = processor.prepare();
if (ret < 0)
return ret;
+ using namespace rgw::putobj;
+
+ DataProcessor *filter = &processor;
+
+ CompressorRef plugin;
+ boost::optional<RGWPutObj_Compress> compressor;
+
+ const auto& compression_type = store->get_zone_params().get_compression_type(bucket_info.placement_rule);
+ if (compression_type != "none") {
+ plugin = Compressor::create(store->ctx(), compression_type);
+ if (!plugin) {
+ ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
+ << compression_type << dendl;
+ } else {
+ compressor.emplace(store->ctx(), plugin, filter);
+ filter = &*compressor;
+ }
+ }
+
off_t ofs = 0;
auto obj_size = data.length();
data.splice(0, read_len, &bl);
etag_calc.update(bl);
- bool again;
-
- do {
- void *handle;
- rgw_raw_obj obj;
-
- ret = processor.handle_data(bl, ofs, &handle, &obj, &again);
- if (ret < 0) {
- return ret;
- }
- ret = processor.throttle_data(handle, obj, read_len, false);
- if (ret < 0)
- return ret;
- } while (again);
+ ret = filter->process(std::move(bl), ofs);
+ if (ret < 0)
+ return ret;
ofs += read_len;
} while (data.length() > 0);
+ ret = filter->process({}, ofs);
+ if (ret < 0) {
+ return ret;
+ }
bool has_etag_attr = false;
auto iter = attrs.find(RGW_ATTR_ETAG);
if (iter != attrs.end()) {
&mtime, mtime,
attrs, delete_at,
nullptr, nullptr,
- puser_data);
+ puser_data,
+ nullptr, nullptr);
}
void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)