rgw_bucket_object_pre_exec(s);
}
+boost::optional<std::pair<std::string, rgw_obj_key>>
+RGWBulkUploadOp::parse_path(const boost::string_ref& path)
+{
+ /* We need to skip all slashes at the beginning in order to preserve
+ * compliance with Swift. */
+ const size_t start_pos = path.find_first_not_of('/');
+
+ if (boost::string_ref::npos != start_pos) {
+ /* Seperator is the first slash after the leading ones. */
+ const size_t sep_pos = path.substr(start_pos).find('/');
+
+ if (boost::string_ref::npos != sep_pos) {
+ const auto bucket_name = path.substr(start_pos, sep_pos - start_pos);
+ const auto obj_name = path.substr(sep_pos + 1);
+
+ return std::make_pair(bucket_name.to_string(),
+ rgw_obj_key(obj_name.to_string()));
+ } else {
+ /* It's guaranteed here that bucket name is at least one character
+ * long and is different than slash. */
+ return std::make_pair(path.substr(start_pos).to_string(),
+ rgw_obj_key());
+ }
+ }
+
+ return boost::none;
+}
+
+int RGWBulkUploadOp::handle_dir_verify_permission()
+{
+ if (s->user->max_buckets > 0) {
+ RGWUserBuckets buckets;
+ std::string marker;
+ bool is_truncated;
+ op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
+ marker, std::string(), s->user->max_buckets,
+ false, &is_truncated);
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ if (buckets.count() >= static_cast<size_t>(s->user->max_buckets)) {
+ return -ERR_TOO_MANY_BUCKETS;
+ }
+ }
+
+ return 0;
+}
+
+int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
+{
+ ldout(s->cct, 20) << "bulk upload: got directory=" << path << dendl;
+
+ op_ret = handle_dir_verify_permission();
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ std::string bucket_name;
+ rgw_obj_key object_junk;
+ std::tie(bucket_name, object_junk) = *parse_path(path);
+
+ rgw_raw_obj obj(store->get_zone_params().domain_root,
+ rgw_make_bucket_entry_name(s->bucket_tenant, bucket_name));
+
+ /* Swift API doesn't support location constraint. We're just checking here
+ * whether creation is taking place in the master zone or not. */
+ if (! store->get_zonegroup().is_master) {
+ ldout(s->cct, 0) << "creating bucket in a non-master zone." << dendl;
+ op_ret = -EINVAL;
+ return op_ret;
+ }
+
+ /* we need to make sure we read bucket info, it's not read before for this
+ * specific request */
+ RGWBucketInfo binfo;
+ std::map<std::string, ceph::bufferlist> battrs;
+ RGWObjectCtx obj_ctx(store); // = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+ op_ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, bucket_name,
+ binfo, NULL, &battrs);
+ if (op_ret < 0 && op_ret != -ENOENT) {
+ return op_ret;
+ }
+ const bool bucket_exists = (op_ret != -ENOENT);
+
+ if (bucket_exists) {
+ RGWAccessControlPolicy old_policy(s->cct);
+ int r = get_bucket_policy_from_attr(s->cct, store, binfo,
+ battrs, &old_policy);
+ if (r >= 0) {
+ if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) {
+ op_ret = -EEXIST;
+ return op_ret;
+ }
+ }
+ }
+
+ RGWBucketInfo master_info;
+ rgw_bucket *pmaster_bucket = nullptr;
+ real_time creation_time;
+ obj_version objv, ep_objv, *pobjv = nullptr;
+
+ if (! store->is_meta_master()) {
+ JSONParser jp;
+ ceph::bufferlist in_data;
+ op_ret = forward_request_to_master(s, nullptr, store, in_data, &jp);
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
+ JSONDecoder::decode_json("object_ver", objv, &jp);
+ JSONDecoder::decode_json("bucket_info", master_info, &jp);
+
+ ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver="
+ << objv.ver << dendl;
+ ldout(s->cct, 20) << "got creation_time="<< master_info.creation_time
+ << dendl;
+
+ pmaster_bucket= &master_info.bucket;
+ creation_time = master_info.creation_time;
+ pobjv = &objv;
+ } else {
+ pmaster_bucket = nullptr;
+ }
+
+
+ std::string placement_rule;
+ if (bucket_exists) {
+ std::string selected_placement_rule;
+ rgw_bucket bucket;
+ bucket.tenant = s->bucket_tenant;
+ bucket.name = s->bucket_name;
+ op_ret = store->select_bucket_placement(*(s->user),
+ store->get_zonegroup().get_id(),
+ placement_rule,
+ bucket,
+ &selected_placement_rule,
+ nullptr);
+ if (selected_placement_rule != binfo.placement_rule) {
+ op_ret = -EEXIST;
+ ldout(s->cct, 20) << "bulk upload: non-coherent placement rule" << dendl;
+ return op_ret;
+ }
+ }
+
+ /* Create metadata: ACLs. */
+ std::map<std::string, ceph::bufferlist> attrs;
+ RGWAccessControlPolicy policy;
+ policy.create_default(s->user->user_id, s->user->display_name);
+ ceph::bufferlist aclbl;
+ policy.encode(aclbl);
+ attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
+
+ RGWQuotaInfo quota_info;
+ const RGWQuotaInfo * pquota_info = nullptr;
+
+ rgw_bucket bucket;
+ bucket.tenant = s->bucket_tenant; /* ignored if bucket exists */
+ bucket.name = bucket_name;
+
+
+ RGWBucketInfo out_info;
+ op_ret = store->create_bucket(*(s->user),
+ bucket,
+ store->get_zonegroup().get_id(),
+ placement_rule, binfo.swift_ver_location,
+ pquota_info, attrs,
+ out_info, pobjv, &ep_objv, creation_time,
+ pmaster_bucket, true);
+ /* continue if EEXIST and create_bucket will fail below. this way we can
+ * recover from a partial create by retrying it. */
+ ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << op_ret
+ << ", bucket=" << bucket << dendl;
+
+ if (op_ret && op_ret != -EEXIST) {
+ return op_ret;
+ }
+
+ const bool existed = (op_ret == -EEXIST);
+ if (existed) {
+ /* bucket already existed, might have raced with another bucket creation, or
+ * might be partial bucket creation that never completed. Read existing bucket
+ * info, verify that the reported bucket owner is the current user.
+ * If all is ok then update the user's list of buckets.
+ * Otherwise inform client about a name conflict.
+ */
+ if (out_info.owner.compare(s->user->user_id) != 0) {
+ op_ret = -EEXIST;
+ ldout(s->cct, 20) << "bulk upload: conflicting bucket name" << dendl;
+ return op_ret;
+ }
+ bucket = out_info.bucket;
+ }
+
+ op_ret = rgw_link_bucket(store, s->user->user_id, bucket,
+ out_info.creation_time, false);
+ if (op_ret && !existed && op_ret != -EEXIST) {
+ /* if it exists (or previously existed), don't remove it! */
+ op_ret = rgw_unlink_bucket(store, s->user->user_id,
+ bucket.tenant, bucket.name);
+ if (op_ret < 0) {
+ ldout(s->cct, 0) << "bulk upload: WARNING: failed to unlink bucket: ret="
+ << op_ret << dendl;
+ }
+ } else if (op_ret == -EEXIST || (op_ret == 0 && existed)) {
+ ldout(s->cct, 20) << "bulk upload: containers already exists"
+ << dendl;
+ op_ret = -ERR_BUCKET_EXISTS;
+ }
+
+ return op_ret;
+}
+
void RGWBulkUploadOp::execute()
{
ceph::bufferlist buffer(64 * 1024);
}
case rgw::tar::FileType::DIRECTORY: {
ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl;
+
+ op_ret = handle_dir(header->get_filename());
break;
}
default: {