From: Radoslaw Zarzynski Date: Thu, 24 Nov 2016 12:10:56 +0000 (+0100) Subject: rgw: implement the container creation in BulkUpload of Swift API. X-Git-Tag: v12.0.2~214^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e9afe386658dfe106d73dc492bd3f4d8fd30e7aa;p=ceph.git rgw: implement the container creation in BulkUpload of Swift API. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index fea8872b882c..bd5f380623f0 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -5528,6 +5528,220 @@ void RGWBulkUploadOp::pre_exec() rgw_bucket_object_pre_exec(s); } +boost::optional> +RGWBulkUploadOp::parse_path(const boost::string_ref& path) +{ + /* We need to skip all slashes at the beginning in order to preserve + * compliance with Swift. */ + const size_t start_pos = path.find_first_not_of('/'); + + if (boost::string_ref::npos != start_pos) { + /* Seperator is the first slash after the leading ones. */ + const size_t sep_pos = path.substr(start_pos).find('/'); + + if (boost::string_ref::npos != sep_pos) { + const auto bucket_name = path.substr(start_pos, sep_pos - start_pos); + const auto obj_name = path.substr(sep_pos + 1); + + return std::make_pair(bucket_name.to_string(), + rgw_obj_key(obj_name.to_string())); + } else { + /* It's guaranteed here that bucket name is at least one character + * long and is different than slash. */ + return std::make_pair(path.substr(start_pos).to_string(), + rgw_obj_key()); + } + } + + return boost::none; +} + +int RGWBulkUploadOp::handle_dir_verify_permission() +{ + if (s->user->max_buckets > 0) { + RGWUserBuckets buckets; + std::string marker; + bool is_truncated; + op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets, + marker, std::string(), s->user->max_buckets, + false, &is_truncated); + if (op_ret < 0) { + return op_ret; + } + + if (buckets.count() >= static_cast(s->user->max_buckets)) { + return -ERR_TOO_MANY_BUCKETS; + } + } + + return 0; +} + +int RGWBulkUploadOp::handle_dir(const boost::string_ref path) +{ + ldout(s->cct, 20) << "bulk upload: got directory=" << path << dendl; + + op_ret = handle_dir_verify_permission(); + if (op_ret < 0) { + return op_ret; + } + + std::string bucket_name; + rgw_obj_key object_junk; + std::tie(bucket_name, object_junk) = *parse_path(path); + + rgw_raw_obj obj(store->get_zone_params().domain_root, + rgw_make_bucket_entry_name(s->bucket_tenant, bucket_name)); + + /* Swift API doesn't support location constraint. We're just checking here + * whether creation is taking place in the master zone or not. */ + if (! store->get_zonegroup().is_master) { + ldout(s->cct, 0) << "creating bucket in a non-master zone." << dendl; + op_ret = -EINVAL; + return op_ret; + } + + /* we need to make sure we read bucket info, it's not read before for this + * specific request */ + RGWBucketInfo binfo; + std::map battrs; + RGWObjectCtx obj_ctx(store); // = *static_cast(s->obj_ctx); + op_ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, bucket_name, + binfo, NULL, &battrs); + if (op_ret < 0 && op_ret != -ENOENT) { + return op_ret; + } + const bool bucket_exists = (op_ret != -ENOENT); + + if (bucket_exists) { + RGWAccessControlPolicy old_policy(s->cct); + int r = get_bucket_policy_from_attr(s->cct, store, binfo, + battrs, &old_policy); + if (r >= 0) { + if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) { + op_ret = -EEXIST; + return op_ret; + } + } + } + + RGWBucketInfo master_info; + rgw_bucket *pmaster_bucket = nullptr; + real_time creation_time; + obj_version objv, ep_objv, *pobjv = nullptr; + + if (! store->is_meta_master()) { + JSONParser jp; + ceph::bufferlist in_data; + op_ret = forward_request_to_master(s, nullptr, store, in_data, &jp); + if (op_ret < 0) { + return op_ret; + } + + JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp); + JSONDecoder::decode_json("object_ver", objv, &jp); + JSONDecoder::decode_json("bucket_info", master_info, &jp); + + ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" + << objv.ver << dendl; + ldout(s->cct, 20) << "got creation_time="<< master_info.creation_time + << dendl; + + pmaster_bucket= &master_info.bucket; + creation_time = master_info.creation_time; + pobjv = &objv; + } else { + pmaster_bucket = nullptr; + } + + + std::string placement_rule; + if (bucket_exists) { + std::string selected_placement_rule; + rgw_bucket bucket; + bucket.tenant = s->bucket_tenant; + bucket.name = s->bucket_name; + op_ret = store->select_bucket_placement(*(s->user), + store->get_zonegroup().get_id(), + placement_rule, + bucket, + &selected_placement_rule, + nullptr); + if (selected_placement_rule != binfo.placement_rule) { + op_ret = -EEXIST; + ldout(s->cct, 20) << "bulk upload: non-coherent placement rule" << dendl; + return op_ret; + } + } + + /* Create metadata: ACLs. */ + std::map attrs; + RGWAccessControlPolicy policy; + policy.create_default(s->user->user_id, s->user->display_name); + ceph::bufferlist aclbl; + policy.encode(aclbl); + attrs.emplace(RGW_ATTR_ACL, std::move(aclbl)); + + RGWQuotaInfo quota_info; + const RGWQuotaInfo * pquota_info = nullptr; + + rgw_bucket bucket; + bucket.tenant = s->bucket_tenant; /* ignored if bucket exists */ + bucket.name = bucket_name; + + + RGWBucketInfo out_info; + op_ret = store->create_bucket(*(s->user), + bucket, + store->get_zonegroup().get_id(), + placement_rule, binfo.swift_ver_location, + pquota_info, attrs, + out_info, pobjv, &ep_objv, creation_time, + pmaster_bucket, true); + /* continue if EEXIST and create_bucket will fail below. this way we can + * recover from a partial create by retrying it. */ + ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << op_ret + << ", bucket=" << bucket << dendl; + + if (op_ret && op_ret != -EEXIST) { + return op_ret; + } + + const bool existed = (op_ret == -EEXIST); + if (existed) { + /* bucket already existed, might have raced with another bucket creation, or + * might be partial bucket creation that never completed. Read existing bucket + * info, verify that the reported bucket owner is the current user. + * If all is ok then update the user's list of buckets. + * Otherwise inform client about a name conflict. + */ + if (out_info.owner.compare(s->user->user_id) != 0) { + op_ret = -EEXIST; + ldout(s->cct, 20) << "bulk upload: conflicting bucket name" << dendl; + return op_ret; + } + bucket = out_info.bucket; + } + + op_ret = rgw_link_bucket(store, s->user->user_id, bucket, + out_info.creation_time, false); + if (op_ret && !existed && op_ret != -EEXIST) { + /* if it exists (or previously existed), don't remove it! */ + op_ret = rgw_unlink_bucket(store, s->user->user_id, + bucket.tenant, bucket.name); + if (op_ret < 0) { + ldout(s->cct, 0) << "bulk upload: WARNING: failed to unlink bucket: ret=" + << op_ret << dendl; + } + } else if (op_ret == -EEXIST || (op_ret == 0 && existed)) { + ldout(s->cct, 20) << "bulk upload: containers already exists" + << dendl; + op_ret = -ERR_BUCKET_EXISTS; + } + + return op_ret; +} + void RGWBulkUploadOp::execute() { ceph::bufferlist buffer(64 * 1024); @@ -5564,6 +5778,8 @@ void RGWBulkUploadOp::execute() } case rgw::tar::FileType::DIRECTORY: { ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl; + + op_ret = handle_dir(header->get_filename()); break; } default: { diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index c17c4dbe3862..c671f85072f4 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -395,6 +395,11 @@ protected: virtual std::unique_ptr create_stream() = 0; virtual void send_response() = 0; + boost::optional> + parse_path(const boost::string_ref& path); + + int handle_dir_verify_permission(); + int handle_dir(boost::string_ref path); public: int verify_permission() override; void pre_exec() override;