void RGWPostObj::execute()
{
RGWPutObjDataProcessor *filter = nullptr;
- std::unique_ptr<RGWPutObjDataProcessor> encrypt;
- char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
- unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
- MD5 hash;
- buffer::list bl, aclbl;
- int len = 0;
boost::optional<RGWPutObj_Compress> compressor;
CompressorRef plugin;
- // read in the data from the POST form
+ /* Read in the data from the POST form. */
op_ret = get_params();
if (op_ret < 0) {
return;
return;
}
- op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
- user_quota, bucket_quota, s->content_length);
- if (op_ret < 0) {
- return;
- }
-
- RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
- s->bucket_info,
- s->bucket,
- s->object.name,
- /* part size */
- s->cct->_conf->rgw_obj_stripe_size,
- s->req_id,
- s->bucket_info.versioning_enabled());
+ /* Start iteration over data fields. It's necessary as Swift's FormPost
+ * is capable to handle multiple files in single form. */
+ do {
+ std::unique_ptr<RGWPutObjDataProcessor> encrypt;
+ char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+ unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ MD5 hash;
+ ceph::buffer::list bl, aclbl;
+ int len = 0;
+
+ op_ret = store->check_quota(s->bucket_owner.get_id(),
+ s->bucket,
+ user_quota,
+ bucket_quota,
+ s->content_length);
+ if (op_ret < 0) {
+ return;
+ }
- // no filters by default
- filter = &processor;
+ RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
+ s->bucket_info,
+ s->bucket,
+ get_current_filename(),
+ /* part size */
+ s->cct->_conf->rgw_obj_stripe_size,
+ s->req_id,
+ s->bucket_info.versioning_enabled());
+ /* No filters by default. */
+ filter = &processor;
- op_ret = processor.prepare(store, nullptr);
- if (op_ret < 0)
- return;
+ op_ret = processor.prepare(store, nullptr);
+ if (op_ret < 0) {
+ return;
+ }
- op_ret = get_encrypt_filter(&encrypt, filter);
- if (op_ret < 0) {
- return;
- }
- if (encrypt != nullptr) {
- filter = encrypt.get();
- } else {
- const auto& compression_type = store->get_zone_params().get_compression_type(
- s->bucket_info.placement_rule);
- if (compression_type != "none") {
- plugin = Compressor::create(s->cct, compression_type);
- if (!plugin) {
- ldout(s->cct, 1) << "Cannot load plugin for compression type "
- << compression_type << dendl;
- } else {
- compressor.emplace(s->cct, plugin, filter);
- filter = &*compressor;
+ op_ret = get_encrypt_filter(&encrypt, filter);
+ if (op_ret < 0) {
+ return;
+ }
+ if (encrypt != nullptr) {
+ filter = encrypt.get();
+ } else {
+ const auto& compression_type = store->get_zone_params().get_compression_type(
+ s->bucket_info.placement_rule);
+ if (compression_type != "none") {
+ plugin = Compressor::create(s->cct, compression_type);
+ if (!plugin) {
+ ldout(s->cct, 1) << "Cannot load plugin for compression type "
+ << compression_type << dendl;
+ } else {
+ compressor.emplace(s->cct, plugin, filter);
+ filter = &*compressor;
+ }
}
}
- }
- while (data_pending) {
- bufferlist data;
- len = get_data(data);
+ while (data_pending) {
+ bufferlist data;
+ len = get_data(data);
- if (len < 0) {
- op_ret = len;
- return;
- }
+ if (len < 0) {
+ op_ret = len;
+ return;
+ }
- if (!len)
- break;
+ if (!len) {
+ break;
+ }
- hash.Update((const byte *)data.c_str(), data.length());
- op_ret = put_data_and_throttle(filter, data, ofs, false);
+ hash.Update((const byte *)data.c_str(), data.length());
+ op_ret = put_data_and_throttle(filter, data, ofs, false);
- ofs += len;
+ ofs += len;
- if (ofs > max_len) {
- op_ret = -ERR_TOO_LARGE;
- return;
- }
- }
- {
- bufferlist flush;
- op_ret = put_data_and_throttle(filter, flush, ofs, false);
- }
- if (len < min_len) {
- op_ret = -ERR_TOO_SMALL;
- return;
- }
+ if (ofs > max_len) {
+ op_ret = -ERR_TOO_LARGE;
+ return;
+ }
+ }
- s->obj_size = ofs;
+ {
+ bufferlist flush;
+ op_ret = put_data_and_throttle(filter, flush, ofs, false);
+ }
- op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
- user_quota, bucket_quota, s->obj_size);
- if (op_ret < 0) {
- return;
- }
+ if (len < min_len) {
+ op_ret = -ERR_TOO_SMALL;
+ return;
+ }
- hash.Final(m);
- buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
+ s->obj_size = ofs;
- etag = calc_md5;
- bl.append(etag.c_str(), etag.size() + 1);
- emplace_attr(RGW_ATTR_ETAG, std::move(bl));
+ op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
+ user_quota, bucket_quota, s->obj_size);
+ if (op_ret < 0) {
+ return;
+ }
- policy.encode(aclbl);
- emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
+ hash.Final(m);
+ buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
- if (content_type.size()) {
- bufferlist ct_bl;
- ct_bl.append(content_type.c_str(), content_type.size() + 1);
- emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
- }
+ etag = calc_md5;
+ bl.append(etag.c_str(), etag.size() + 1);
+ emplace_attr(RGW_ATTR_ETAG, std::move(bl));
- if (compressor && compressor->is_compressed()) {
- bufferlist tmp;
- RGWCompressionInfo cs_info;
- cs_info.compression_type = plugin->get_type_name();
- cs_info.orig_size = s->obj_size;
- cs_info.blocks = move(compressor->get_compression_blocks());
- ::encode(cs_info, tmp);
- emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
- }
+ policy.encode(aclbl);
+ emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
+
+ if (content_type.size()) {
+ bufferlist ct_bl;
+ ct_bl.append(content_type.c_str(), content_type.size() + 1);
+ emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
+ }
+
+ if (compressor && compressor->is_compressed()) {
+ ceph::bufferlist tmp;
+ RGWCompressionInfo cs_info;
+ cs_info.compression_type = plugin->get_type_name();
+ cs_info.orig_size = s->obj_size;
+ cs_info.blocks = move(compressor->get_compression_blocks());
+ ::encode(cs_info, tmp);
+ emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
+ }
- op_ret = processor.complete(s->obj_size, etag, NULL, real_time(), attrs,
- (delete_at ? *delete_at : real_time()));
+ op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(),
+ attrs, (delete_at ? *delete_at : real_time()));
+ } while (is_next_file_to_upload());
}