From 5164d77ac1bbb2be4c62bbb1ad050f2c8840c10b Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Wed, 14 Sep 2016 13:25:53 +0200 Subject: [PATCH] rgw: RGWPostObj is able now to handle multiple files in single form. Signed-off-by: Radoslaw Zarzynski --- src/rgw/rgw_op.cc | 207 ++++++++++++++++++++++------------------- src/rgw/rgw_op.h | 5 + src/rgw/rgw_rest_s3.cc | 5 + src/rgw/rgw_rest_s3.h | 2 + 4 files changed, 121 insertions(+), 98 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 4ec0fb7688118..2f2efde1dd736 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3268,16 +3268,10 @@ void RGWPostObj::pre_exec() void RGWPostObj::execute() { RGWPutObjDataProcessor *filter = nullptr; - std::unique_ptr encrypt; - char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; - unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; - MD5 hash; - buffer::list bl, aclbl; - int len = 0; boost::optional compressor; CompressorRef plugin; - // read in the data from the POST form + /* Read in the data from the POST form. */ op_ret = get_params(); if (op_ret < 0) { return; @@ -3293,116 +3287,133 @@ void RGWPostObj::execute() return; } - op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket, - user_quota, bucket_quota, s->content_length); - if (op_ret < 0) { - return; - } - - RGWPutObjProcessor_Atomic processor(*static_cast(s->obj_ctx), - s->bucket_info, - s->bucket, - s->object.name, - /* part size */ - s->cct->_conf->rgw_obj_stripe_size, - s->req_id, - s->bucket_info.versioning_enabled()); + /* Start iteration over data fields. It's necessary as Swift's FormPost + * is capable to handle multiple files in single form. */ + do { + std::unique_ptr encrypt; + char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; + unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; + MD5 hash; + ceph::buffer::list bl, aclbl; + int len = 0; + + op_ret = store->check_quota(s->bucket_owner.get_id(), + s->bucket, + user_quota, + bucket_quota, + s->content_length); + if (op_ret < 0) { + return; + } - // no filters by default - filter = &processor; + RGWPutObjProcessor_Atomic processor(*static_cast(s->obj_ctx), + s->bucket_info, + s->bucket, + get_current_filename(), + /* part size */ + s->cct->_conf->rgw_obj_stripe_size, + s->req_id, + s->bucket_info.versioning_enabled()); + /* No filters by default. */ + filter = &processor; - op_ret = processor.prepare(store, nullptr); - if (op_ret < 0) - return; + op_ret = processor.prepare(store, nullptr); + if (op_ret < 0) { + return; + } - op_ret = get_encrypt_filter(&encrypt, filter); - if (op_ret < 0) { - return; - } - if (encrypt != nullptr) { - filter = encrypt.get(); - } else { - const auto& compression_type = store->get_zone_params().get_compression_type( - s->bucket_info.placement_rule); - if (compression_type != "none") { - plugin = Compressor::create(s->cct, compression_type); - if (!plugin) { - ldout(s->cct, 1) << "Cannot load plugin for compression type " - << compression_type << dendl; - } else { - compressor.emplace(s->cct, plugin, filter); - filter = &*compressor; + op_ret = get_encrypt_filter(&encrypt, filter); + if (op_ret < 0) { + return; + } + if (encrypt != nullptr) { + filter = encrypt.get(); + } else { + const auto& compression_type = store->get_zone_params().get_compression_type( + s->bucket_info.placement_rule); + if (compression_type != "none") { + plugin = Compressor::create(s->cct, compression_type); + if (!plugin) { + ldout(s->cct, 1) << "Cannot load plugin for compression type " + << compression_type << dendl; + } else { + compressor.emplace(s->cct, plugin, filter); + filter = &*compressor; + } } } - } - while (data_pending) { - bufferlist data; - len = get_data(data); + while (data_pending) { + bufferlist data; + len = get_data(data); - if (len < 0) { - op_ret = len; - return; - } + if (len < 0) { + op_ret = len; + return; + } - if (!len) - break; + if (!len) { + break; + } - hash.Update((const byte *)data.c_str(), data.length()); - op_ret = put_data_and_throttle(filter, data, ofs, false); + hash.Update((const byte *)data.c_str(), data.length()); + op_ret = put_data_and_throttle(filter, data, ofs, false); - ofs += len; + ofs += len; - if (ofs > max_len) { - op_ret = -ERR_TOO_LARGE; - return; - } - } - { - bufferlist flush; - op_ret = put_data_and_throttle(filter, flush, ofs, false); - } - if (len < min_len) { - op_ret = -ERR_TOO_SMALL; - return; - } + if (ofs > max_len) { + op_ret = -ERR_TOO_LARGE; + return; + } + } - s->obj_size = ofs; + { + bufferlist flush; + op_ret = put_data_and_throttle(filter, flush, ofs, false); + } - op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket, - user_quota, bucket_quota, s->obj_size); - if (op_ret < 0) { - return; - } + if (len < min_len) { + op_ret = -ERR_TOO_SMALL; + return; + } - hash.Final(m); - buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); + s->obj_size = ofs; - etag = calc_md5; - bl.append(etag.c_str(), etag.size() + 1); - emplace_attr(RGW_ATTR_ETAG, std::move(bl)); + op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket, + user_quota, bucket_quota, s->obj_size); + if (op_ret < 0) { + return; + } - policy.encode(aclbl); - emplace_attr(RGW_ATTR_ACL, std::move(aclbl)); + hash.Final(m); + buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); - if (content_type.size()) { - bufferlist ct_bl; - ct_bl.append(content_type.c_str(), content_type.size() + 1); - emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl)); - } + etag = calc_md5; + bl.append(etag.c_str(), etag.size() + 1); + emplace_attr(RGW_ATTR_ETAG, std::move(bl)); - if (compressor && compressor->is_compressed()) { - bufferlist tmp; - RGWCompressionInfo cs_info; - cs_info.compression_type = plugin->get_type_name(); - cs_info.orig_size = s->obj_size; - cs_info.blocks = move(compressor->get_compression_blocks()); - ::encode(cs_info, tmp); - emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp)); - } + policy.encode(aclbl); + emplace_attr(RGW_ATTR_ACL, std::move(aclbl)); + + if (content_type.size()) { + bufferlist ct_bl; + ct_bl.append(content_type.c_str(), content_type.size() + 1); + emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl)); + } + + if (compressor && compressor->is_compressed()) { + ceph::bufferlist tmp; + RGWCompressionInfo cs_info; + cs_info.compression_type = plugin->get_type_name(); + cs_info.orig_size = s->obj_size; + cs_info.blocks = move(compressor->get_compression_blocks()); + ::encode(cs_info, tmp); + emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp)); + } - op_ret = processor.complete(s->obj_size, etag, NULL, real_time(), attrs, - (delete_at ? *delete_at : real_time())); + op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), + attrs, (delete_at ? *delete_at : real_time())); + } while (is_next_file_to_upload()); } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 11c88a37fa22e..060a4031c3631 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1020,6 +1020,11 @@ protected: map attrs; boost::optional delete_at; + /* Must be called after get_data() or the result is undefined. */ + virtual std::string get_current_filename() const = 0; + virtual bool is_next_file_to_upload() { + return false; + } public: RGWPostObj() : min_len(0), max_len(LLONG_MAX), diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 1f504b7b0ef4c..b790f254e4be8 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -1768,6 +1768,11 @@ void RGWPostObj_ObjStore_S3::rebuild_key(string& key) key = new_key; } +std::string RGWPostObj_ObjStore_S3::get_current_filename() const +{ + return s->object.name; +} + int RGWPostObj_ObjStore_S3::get_params() { // get the part boundary diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index 291b04b9281ea..5b1f8143c2d1a 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -235,6 +235,8 @@ class RGWPostObj_ObjStore_S3 : public RGWPostObj_ObjStore { int get_policy(); void rebuild_key(string& key); + + std::string get_current_filename() const override; public: RGWPostObj_ObjStore_S3() {} ~RGWPostObj_ObjStore_S3() override {} -- 2.39.5