#include "common/mime.h"
#include "common/utf8.h"
#include "common/ceph_json.h"
+#include "common/static_ptr.h"
#include "rgw_rados.h"
#include "rgw_op.h"
#include "rgw_compression.h"
#include "rgw_role.h"
#include "rgw_tag_s3.h"
+#include "rgw_putobj_processor.h"
+#include "rgw_putobj_throttle.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/rgw/cls_rgw_client.h"
void RGWPutObj::execute()
{
- std::unique_ptr<RGWPutObjProcessor> processor;
- RGWPutObjDataProcessor *filter = nullptr;
- std::unique_ptr<RGWPutObjDataProcessor> encrypt;
char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
MD5 hash;
bufferlist bl, aclbl, bs;
int len;
- bool multipart;
off_t fst;
off_t lst;
- const auto& compression_type = store->get_zone_params().get_compression_type(
- s->bucket_info.placement_rule);
- CompressorRef plugin;
- boost::optional<RGWPutObj_Compress> compressor;
bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
perfcounter->inc(l_rgw_put);
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}
+ const bool multipart = !multipart_upload_id.empty();
auto& obj_ctx = *static_cast<RGWObjectCtx*>(s->obj_ctx);
- processor.reset(select_processor(obj_ctx, &multipart));
-
- // no filters by default
- filter = processor.get();
+ rgw_obj obj{s->bucket, s->object};
/* Handle object versioning of Swift API. */
if (! multipart) {
- rgw_obj obj(s->bucket, s->object);
op_ret = store->swift_versioning_copy(obj_ctx,
s->bucket_owner.get_id(),
s->bucket_info,
}
}
- op_ret = processor->prepare(store, NULL);
+ // create the object processor
+ using namespace rgw::putobj;
+ AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+ constexpr auto max_processor_size = std::max(sizeof(MultipartObjectProcessor),
+ sizeof(AtomicObjectProcessor));
+ ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
+
+ if (multipart) {
+ processor.emplace<MultipartObjectProcessor>(
+ &aio, store, s->bucket_info, s->owner.get_id(), obj_ctx, obj,
+ multipart_upload_id, multipart_part_num, multipart_part_str);
+ } else {
+ if (s->bucket_info.versioning_enabled()) {
+ if (!version_id.empty()) {
+ obj.key.set_instance(version_id);
+ } else {
+ store->gen_rand_obj_instance_name(&obj);
+ version_id = obj.key.instance;
+ }
+ }
+ processor.emplace<AtomicObjectProcessor>(
+ &aio, store, s->bucket_info, s->bucket_owner.get_id(),
+ obj_ctx, obj, olh_epoch, s->req_id);
+ }
+
+ op_ret = processor->prepare();
if (op_ret < 0) {
ldpp_dout(this, 20) << "processor->prepare() returned ret=" << op_ret
<< dendl;
}
fst = copy_source_range_fst;
-#if 0
+
+ // no filters by default
+ DataProcessor *filter = processor.get();
+
+ const auto& compression_type = store->get_zone_params().get_compression_type(
+ s->bucket_info.placement_rule);
+ CompressorRef plugin;
+ boost::optional<RGWPutObj_Compress> compressor;
+
+ std::unique_ptr<DataProcessor> encrypt;
op_ret = get_encrypt_filter(&encrypt, filter);
if (op_ret < 0) {
return;
}
if (encrypt != nullptr) {
- filter = encrypt.get();
- } else {
- //no encryption, we can try compression
- if (compression_type != "none") {
- plugin = get_compressor_plugin(s, compression_type);
- if (!plugin) {
- ldpp_dout(this, 1) << "Cannot load plugin for compression type "
- << compression_type << dendl;
- } else {
- compressor.emplace(s->cct, plugin, filter);
- filter = &*compressor;
- }
+ filter = &*encrypt;
+ } else if (compression_type != "none") {
+ plugin = get_compressor_plugin(s, compression_type);
+ if (!plugin) {
+ ldpp_dout(this, 1) << "Cannot load plugin for compression type "
+ << compression_type << dendl;
+ } else {
+ compressor.emplace(s->cct, plugin, filter);
+ filter = &*compressor;
}
}
-#endif
tracepoint(rgw_op, before_data_transfer, s->req_id.c_str());
do {
bufferlist data;
op_ret = len;
ldpp_dout(this, 20) << "get_data() returned ret=" << op_ret << dendl;
return;
+ } else if (len == 0) {
+ break;
}
if (need_calc_md5) {
/* update torrrent */
torrent.update(data);
- /* do we need this operation to be synchronous? if we're dealing with an object with immutable
- * head, e.g., multipart object we need to make sure we're the first one writing to this object
- */
- bool need_to_wait = (ofs == 0) && multipart;
-
- bufferlist orig_data;
-
- if (need_to_wait) {
- orig_data = data;
- }
-
- op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
+ op_ret = filter->process(std::move(data), ofs);
if (op_ret < 0) {
- if (op_ret != -EEXIST) {
- ldpp_dout(this, 20) << "processor->thottle_data() returned ret="
- << op_ret << dendl;
- return;
- }
- /* need_to_wait == true and op_ret == -EEXIST */
- ldpp_dout(this, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
-
- /* restore original data */
- data.swap(orig_data);
-
- /* restart processing with different oid suffix */
- processor.reset(select_processor(obj_ctx, &multipart));
- filter = processor.get();
-
- string oid_rand;
- char buf[33];
- gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
- oid_rand.append(buf);
-
- op_ret = processor->prepare(store, &oid_rand);
- if (op_ret < 0) {
- ldpp_dout(this, 0) << "ERROR: processor->prepare() returned "
- << op_ret << dendl;
- return;
- }
-#if 0
- op_ret = get_encrypt_filter(&encrypt, filter);
- if (op_ret < 0) {
- return;
- }
- if (encrypt != nullptr) {
- filter = encrypt.get();
- } else {
- if (compressor) {
- compressor.emplace(s->cct, plugin, filter);
- filter = &*compressor;
- }
- }
-#endif
- op_ret = put_data_and_throttle(filter, data, ofs, false);
- if (op_ret < 0) {
- return;
- }
+ ldpp_dout(this, 20) << "processor->process() returned ret="
+ << op_ret << dendl;
+ return;
}
ofs += len;
} while (len > 0);
tracepoint(rgw_op, after_data_transfer, s->req_id.c_str(), ofs);
- {
- bufferlist flush;
- op_ret = put_data_and_throttle(filter, flush, ofs, false);
- if (op_ret < 0) {
- return;
- }
+ // flush any data in filters
+ op_ret = filter->process({}, ofs);
+ if (op_ret < 0) {
+ return;
}
if (!chunked_upload && ofs != s->content_length) {
tracepoint(rgw_op, processor_complete_enter, s->req_id.c_str());
op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
(delete_at ? *delete_at : real_time()), if_match, if_nomatch,
- (user_data.empty() ? nullptr : &user_data));
+ (user_data.empty() ? nullptr : &user_data), nullptr, nullptr);
tracepoint(rgw_op, processor_complete_exit, s->req_id.c_str());
- // only atomic upload will upate version_id here
- if (!multipart)
- version_id = (static_cast<RGWPutObjProcessor_Atomic *>(processor.get()))->get_version_id();
-
/* produce torrent */
if (s->cct->_conf->rgw_torrent_flag && (ofs == torrent.get_data_len()))
{
}
int RGWPutObj_ObjStore_S3::get_encrypt_filter(
- std::unique_ptr<RGWPutObjDataProcessor>* filter,
- RGWPutObjDataProcessor* cb)
+ std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+ rgw::putobj::DataProcessor *cb)
{
int res = 0;
- RGWPutObjProcessor_Multipart* multi_processor=dynamic_cast<RGWPutObjProcessor_Multipart*>(cb);
- if (multi_processor != nullptr) {
- RGWMPObj* mp = nullptr;
- multi_processor->get_mp(&mp);
- if (mp != nullptr) {
- map<string, bufferlist> xattrs;
- string meta_oid;
- meta_oid = mp->get_meta();
-
- rgw_obj obj;
- obj.init_ns(s->bucket, meta_oid, RGW_OBJ_NS_MULTIPART);
- obj.set_in_extra_data(true);
- res = get_obj_attrs(store, s, obj, xattrs);
- if (res == 0) {
- std::unique_ptr<BlockCrypt> block_crypt;
- /* We are adding to existing object.
- * We use crypto mode that configured as if we were decrypting. */
- res = rgw_s3_prepare_decrypt(s, xattrs, &block_crypt, crypt_http_responses);
-#if 0
- if (res == 0 && block_crypt != nullptr)
- *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
- new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
-#endif
- }
+ if (!multipart_upload_id.empty()) {
+ RGWMPObj mp(s->object.name, multipart_upload_id);
+ rgw_obj obj;
+ obj.init_ns(s->bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART);
+ obj.set_in_extra_data(true);
+ map<string, bufferlist> xattrs;
+ res = get_obj_attrs(store, s, obj, xattrs);
+ if (res == 0) {
+ std::unique_ptr<BlockCrypt> block_crypt;
+ /* We are adding to existing object.
+ * We use crypto mode that configured as if we were decrypting. */
+ res = rgw_s3_prepare_decrypt(s, xattrs, &block_crypt, crypt_http_responses);
+ if (res == 0 && block_crypt != nullptr)
+ filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
}
/* it is ok, to not have encryption at all */
}
{
std::unique_ptr<BlockCrypt> block_crypt;
res = rgw_s3_prepare_encrypt(s, attrs, nullptr, &block_crypt, crypt_http_responses);
-#if 0
if (res == 0 && block_crypt != nullptr) {
- *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
- new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
+ filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
}
-#endif
}
return res;
}
}
int RGWPostObj_ObjStore_S3::get_encrypt_filter(
- std::unique_ptr<RGWPutObjDataProcessor>* filter, RGWPutObjDataProcessor* cb)
+ std::unique_ptr<rgw::putobj::DataProcessor> *filter,
+ rgw::putobj::DataProcessor *cb)
{
- int res = 0;
std::unique_ptr<BlockCrypt> block_crypt;
- res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt, crypt_http_responses);
-#if 0
+ int res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt,
+ crypt_http_responses);
if (res == 0 && block_crypt != nullptr) {
- *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
- new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
+ filter->reset(new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
}
- else
- *filter = nullptr;
-#endif
return res;
}