return op_ret;
}
+
+bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo,
+ std::map<std::string, ceph::bufferlist>& battrs,
+ ACLOwner& bucket_owner /* out */)
+{
+ RGWAccessControlPolicy bacl(store->ctx());
+ op_ret = read_bucket_policy(store, s, binfo, battrs, &bacl, binfo.bucket);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "bulk upload: cannot read_policy() for bucket"
+ << dendl;
+ return false;
+ }
+
+ bucket_owner = bacl.get_owner();
+ return verify_bucket_permission(s, s->user_acl.get(), &bacl, RGW_PERM_WRITE);
+}
+
+int RGWBulkUploadOp::handle_file(const boost::string_ref path,
+ const size_t size,
+ AlignedStreamGetter& body)
+{
+
+ ldout(s->cct, 20) << "bulk upload: got file=" << path << ", size=" << size
+ << dendl;
+
+ RGWPutObjDataProcessor *filter = nullptr;
+ boost::optional<RGWPutObj_Compress> compressor;
+
+ if (size > static_cast<const size_t>(s->cct->_conf->rgw_max_put_size)) {
+ op_ret = -ERR_TOO_LARGE;
+ return op_ret;
+ }
+
+ std::string bucket_name;
+ rgw_obj_key object;
+ std::tie(bucket_name, object) = *parse_path(path);
+
+ auto& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+ RGWBucketInfo binfo;
+ std::map<std::string, ceph::bufferlist> battrs;
+ ACLOwner bowner;
+ op_ret = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
+ bucket_name, binfo, nullptr, &battrs);
+ if (op_ret == -ENOENT) {
+ ldout(s->cct, 20) << "bulk upload: non existent directory=" << bucket_name
+ << dendl;
+ } else if (op_ret < 0) {
+ return op_ret;
+ }
+
+ if (! handle_file_verify_permission(binfo, battrs, bowner)) {
+ ldout(s->cct, 20) << "bulk upload: object creation unauthorized" << dendl;
+ op_ret = -EACCES;
+ return op_ret;
+ }
+
+ op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
+ user_quota, bucket_quota, size);
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ RGWPutObjProcessor_Atomic processor(obj_ctx,
+ binfo,
+ binfo.bucket,
+ object.name,
+ /* part size */
+ s->cct->_conf->rgw_obj_stripe_size,
+ s->req_id,
+ binfo.versioning_enabled());
+
+ /* No filters by default. */
+ filter = &processor;
+
+ op_ret = processor.prepare(store, nullptr);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "bulk upload: cannot prepare processor due to ret="
+ << op_ret << dendl;
+ return op_ret;
+ }
+
+ const auto& compression_type = store->get_zone_params().get_compression_type(
+ binfo.placement_rule);
+ CompressorRef plugin;
+ if (compression_type != "none") {
+ plugin = Compressor::create(s->cct, compression_type);
+ if (! plugin) {
+ ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
+ << compression_type << dendl;
+ } else {
+ compressor.emplace(s->cct, plugin, filter);
+ filter = &*compressor;
+ }
+ }
+
+ /* Upload file content. */
+ ssize_t len = 0;
+ size_t ofs = 0;
+ MD5 hash;
+ do {
+ ceph::bufferlist data;
+ len = body.get_at_most(s->cct->_conf->rgw_max_chunk_size, data);
+
+ ldout(s->cct, 20) << "bulk upload: body=" << data.c_str() << dendl;
+ if (len < 0) {
+ op_ret = len;
+ return op_ret;
+ } else if (len > 0) {
+ hash.Update((const byte *)data.c_str(), data.length());
+ op_ret = put_data_and_throttle(filter, data, ofs, false);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "processor->thottle_data() returned ret="
+ << op_ret << dendl;
+ return op_ret;
+ }
+
+ ofs += len;
+ }
+
+ } while (len > 0);
+
+ if (ofs != size) {
+ ldout(s->cct, 10) << "bulk upload: real file size different from declared"
+ << dendl;
+ op_ret = -EINVAL;
+ }
+
+ op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
+ user_quota, bucket_quota, size);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "bulk upload: quota exceeded for path=" << path
+ << dendl;
+ return op_ret;
+ }
+
+ char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+ unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ hash.Final(m);
+ buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
+
+ /* Create metadata: ETAG. */
+ std::map<std::string, ceph::bufferlist> attrs;
+ std::string etag = calc_md5;
+ ceph::bufferlist etag_bl;
+ etag_bl.append(etag.c_str(), etag.size() + 1);
+ attrs.emplace(RGW_ATTR_ETAG, std::move(etag_bl));
+
+ /* Create metadata: ACLs. */
+ RGWAccessControlPolicy policy;
+ policy.create_default(s->user->user_id, s->user->display_name);
+ ceph::bufferlist aclbl;
+ policy.encode(aclbl);
+ attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
+
+ /* Create metadata: compression info. */
+ if (compressor && compressor->is_compressed()) {
+ ceph::bufferlist tmp;
+ RGWCompressionInfo cs_info;
+ cs_info.compression_type = plugin->get_type_name();
+ cs_info.orig_size = s->obj_size;
+ cs_info.blocks = std::move(compressor->get_compression_blocks());
+ ::encode(cs_info, tmp);
+ attrs.emplace(RGW_ATTR_COMPRESSION, std::move(tmp));
+ }
+
+ /* Complete the transaction. */
+ op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), attrs,
+ ceph::real_time() /* delete_at */);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "bulk upload: processor::complete returned op_ret="
+ << op_ret << dendl;
+ }
+
+ return op_ret;
+}
+
void RGWBulkUploadOp::execute()
{
ceph::bufferlist buffer(64 * 1024);
switch (header->get_filetype()) {
case rgw::tar::FileType::NORMAL_FILE: {
ldout(s->cct, 2) << "bulk upload: handling regular file" << dendl;
+
+ auto body = AlignedStreamGetter(0, header->get_filesize(),
+ rgw::tar::BLOCK_SIZE, *stream);
+ op_ret = handle_file(header->get_filename(),
+ header->get_filesize(),
+ body);
break;
}
case rgw::tar::FileType::DIRECTORY: {