From 0c4d1dfabf83b2a3ed439beb1c30888fd35f719c Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Thu, 24 Nov 2016 14:20:33 +0100 Subject: [PATCH] rgw: implement the object creation in BulkUpload of Swift API. Signed-off-by: Radoslaw Zarzynski --- src/rgw/rgw_op.cc | 182 ++++++++++++++++++++++++++++++++++++++++++++++ src/rgw/rgw_op.h | 7 ++ 2 files changed, 189 insertions(+) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index bd5f380623f..f906689cfaa 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -5742,6 +5742,182 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path) return op_ret; } + +bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo, + std::map& battrs, + ACLOwner& bucket_owner /* out */) +{ + RGWAccessControlPolicy bacl(store->ctx()); + op_ret = read_bucket_policy(store, s, binfo, battrs, &bacl, binfo.bucket); + if (op_ret < 0) { + ldout(s->cct, 20) << "bulk upload: cannot read_policy() for bucket" + << dendl; + return false; + } + + bucket_owner = bacl.get_owner(); + return verify_bucket_permission(s, s->user_acl.get(), &bacl, RGW_PERM_WRITE); +} + +int RGWBulkUploadOp::handle_file(const boost::string_ref path, + const size_t size, + AlignedStreamGetter& body) +{ + + ldout(s->cct, 20) << "bulk upload: got file=" << path << ", size=" << size + << dendl; + + RGWPutObjDataProcessor *filter = nullptr; + boost::optional compressor; + + if (size > static_cast(s->cct->_conf->rgw_max_put_size)) { + op_ret = -ERR_TOO_LARGE; + return op_ret; + } + + std::string bucket_name; + rgw_obj_key object; + std::tie(bucket_name, object) = *parse_path(path); + + auto& obj_ctx = *static_cast(s->obj_ctx); + RGWBucketInfo binfo; + std::map battrs; + ACLOwner bowner; + op_ret = store->get_bucket_info(obj_ctx, s->user->user_id.tenant, + bucket_name, binfo, nullptr, &battrs); + if (op_ret == -ENOENT) { + ldout(s->cct, 20) << "bulk upload: non existent directory=" << bucket_name + << dendl; + } else if (op_ret < 0) { + return op_ret; + } + + if (! handle_file_verify_permission(binfo, battrs, bowner)) { + ldout(s->cct, 20) << "bulk upload: object creation unauthorized" << dendl; + op_ret = -EACCES; + return op_ret; + } + + op_ret = store->check_quota(bowner.get_id(), binfo.bucket, + user_quota, bucket_quota, size); + if (op_ret < 0) { + return op_ret; + } + + RGWPutObjProcessor_Atomic processor(obj_ctx, + binfo, + binfo.bucket, + object.name, + /* part size */ + s->cct->_conf->rgw_obj_stripe_size, + s->req_id, + binfo.versioning_enabled()); + + /* No filters by default. */ + filter = &processor; + + op_ret = processor.prepare(store, nullptr); + if (op_ret < 0) { + ldout(s->cct, 20) << "bulk upload: cannot prepare processor due to ret=" + << op_ret << dendl; + return op_ret; + } + + const auto& compression_type = store->get_zone_params().get_compression_type( + binfo.placement_rule); + CompressorRef plugin; + if (compression_type != "none") { + plugin = Compressor::create(s->cct, compression_type); + if (! plugin) { + ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type " + << compression_type << dendl; + } else { + compressor.emplace(s->cct, plugin, filter); + filter = &*compressor; + } + } + + /* Upload file content. */ + ssize_t len = 0; + size_t ofs = 0; + MD5 hash; + do { + ceph::bufferlist data; + len = body.get_at_most(s->cct->_conf->rgw_max_chunk_size, data); + + ldout(s->cct, 20) << "bulk upload: body=" << data.c_str() << dendl; + if (len < 0) { + op_ret = len; + return op_ret; + } else if (len > 0) { + hash.Update((const byte *)data.c_str(), data.length()); + op_ret = put_data_and_throttle(filter, data, ofs, false); + if (op_ret < 0) { + ldout(s->cct, 20) << "processor->thottle_data() returned ret=" + << op_ret << dendl; + return op_ret; + } + + ofs += len; + } + + } while (len > 0); + + if (ofs != size) { + ldout(s->cct, 10) << "bulk upload: real file size different from declared" + << dendl; + op_ret = -EINVAL; + } + + op_ret = store->check_quota(bowner.get_id(), binfo.bucket, + user_quota, bucket_quota, size); + if (op_ret < 0) { + ldout(s->cct, 20) << "bulk upload: quota exceeded for path=" << path + << dendl; + return op_ret; + } + + char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; + unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; + hash.Final(m); + buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); + + /* Create metadata: ETAG. */ + std::map attrs; + std::string etag = calc_md5; + ceph::bufferlist etag_bl; + etag_bl.append(etag.c_str(), etag.size() + 1); + attrs.emplace(RGW_ATTR_ETAG, std::move(etag_bl)); + + /* Create metadata: ACLs. */ + RGWAccessControlPolicy policy; + policy.create_default(s->user->user_id, s->user->display_name); + ceph::bufferlist aclbl; + policy.encode(aclbl); + attrs.emplace(RGW_ATTR_ACL, std::move(aclbl)); + + /* Create metadata: compression info. */ + if (compressor && compressor->is_compressed()) { + ceph::bufferlist tmp; + RGWCompressionInfo cs_info; + cs_info.compression_type = plugin->get_type_name(); + cs_info.orig_size = s->obj_size; + cs_info.blocks = std::move(compressor->get_compression_blocks()); + ::encode(cs_info, tmp); + attrs.emplace(RGW_ATTR_COMPRESSION, std::move(tmp)); + } + + /* Complete the transaction. */ + op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), attrs, + ceph::real_time() /* delete_at */); + if (op_ret < 0) { + ldout(s->cct, 20) << "bulk upload: processor::complete returned op_ret=" + << op_ret << dendl; + } + + return op_ret; +} + void RGWBulkUploadOp::execute() { ceph::bufferlist buffer(64 * 1024); @@ -5774,6 +5950,12 @@ void RGWBulkUploadOp::execute() switch (header->get_filetype()) { case rgw::tar::FileType::NORMAL_FILE: { ldout(s->cct, 2) << "bulk upload: handling regular file" << dendl; + + auto body = AlignedStreamGetter(0, header->get_filesize(), + rgw::tar::BLOCK_SIZE, *stream); + op_ret = handle_file(header->get_filename(), + header->get_filesize(), + body); break; } case rgw::tar::FileType::DIRECTORY: { diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index c671f85072f..df865926454 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -398,6 +398,13 @@ protected: boost::optional> parse_path(const boost::string_ref& path); + bool handle_file_verify_permission(RGWBucketInfo& binfo, + std::map& battrs, + ACLOwner& bucket_owner /* out */); + int handle_file(boost::string_ref path, + size_t size, + AlignedStreamGetter& body); + int handle_dir_verify_permission(); int handle_dir(boost::string_ref path); public: -- 2.39.5