]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement the container creation in BulkUpload of Swift API.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Thu, 24 Nov 2016 12:10:56 +0000 (13:10 +0100)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Sun, 2 Apr 2017 16:35:21 +0000 (18:35 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h

index fea8872b882cd8eb7c2791c44e72e0a8d5a4b6c3..bd5f380623f09ac9b29e14b1d49b64b02bd4b5fa 100644 (file)
@@ -5528,6 +5528,220 @@ void RGWBulkUploadOp::pre_exec()
   rgw_bucket_object_pre_exec(s);
 }
 
+boost::optional<std::pair<std::string, rgw_obj_key>>
+RGWBulkUploadOp::parse_path(const boost::string_ref& path)
+{
+  /* We need to skip all slashes at the beginning in order to preserve
+   * compliance with Swift. */
+  const size_t start_pos = path.find_first_not_of('/');
+
+  if (boost::string_ref::npos != start_pos) {
+    /* Seperator is the first slash after the leading ones. */
+    const size_t sep_pos = path.substr(start_pos).find('/');
+
+    if (boost::string_ref::npos != sep_pos) {
+      const auto bucket_name = path.substr(start_pos, sep_pos - start_pos);
+      const auto obj_name = path.substr(sep_pos + 1);
+
+      return std::make_pair(bucket_name.to_string(),
+                            rgw_obj_key(obj_name.to_string()));
+    } else {
+      /* It's guaranteed here that bucket name is at least one character
+       * long and is different than slash. */
+      return std::make_pair(path.substr(start_pos).to_string(),
+                            rgw_obj_key());
+    }
+  }
+
+  return boost::none;
+}
+
+int RGWBulkUploadOp::handle_dir_verify_permission()
+{
+  if (s->user->max_buckets > 0) {
+    RGWUserBuckets buckets;
+    std::string marker;
+    bool is_truncated;
+    op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
+                                   marker, std::string(), s->user->max_buckets,
+                                   false, &is_truncated);
+    if (op_ret < 0) {
+      return op_ret;
+    }
+
+    if (buckets.count() >= static_cast<size_t>(s->user->max_buckets)) {
+      return -ERR_TOO_MANY_BUCKETS;
+    }
+  }
+
+  return 0;
+}
+
+int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
+{
+  ldout(s->cct, 20) << "bulk upload: got directory=" << path << dendl;
+
+  op_ret = handle_dir_verify_permission();
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
+  std::string bucket_name;
+  rgw_obj_key object_junk;
+  std::tie(bucket_name, object_junk) =  *parse_path(path);
+
+  rgw_raw_obj obj(store->get_zone_params().domain_root,
+                  rgw_make_bucket_entry_name(s->bucket_tenant, bucket_name));
+
+  /* Swift API doesn't support location constraint. We're just checking here
+   * whether creation is taking place in the master zone or not. */
+  if (! store->get_zonegroup().is_master) {
+    ldout(s->cct, 0) << "creating bucket in a non-master zone." << dendl;
+    op_ret = -EINVAL;
+    return op_ret;
+  }
+
+  /* we need to make sure we read bucket info, it's not read before for this
+   * specific request */
+  RGWBucketInfo binfo;
+  std::map<std::string, ceph::bufferlist> battrs;
+  RGWObjectCtx obj_ctx(store); // = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+  op_ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, bucket_name,
+                                  binfo, NULL, &battrs);
+  if (op_ret < 0 && op_ret != -ENOENT) {
+    return op_ret;
+  }
+  const bool bucket_exists = (op_ret != -ENOENT);
+
+  if (bucket_exists) {
+    RGWAccessControlPolicy old_policy(s->cct);
+    int r = get_bucket_policy_from_attr(s->cct, store, binfo,
+                                        battrs, &old_policy);
+    if (r >= 0)  {
+      if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) {
+        op_ret = -EEXIST;
+        return op_ret;
+      }
+    }
+  }
+
+  RGWBucketInfo master_info;
+  rgw_bucket *pmaster_bucket = nullptr;
+  real_time creation_time;
+  obj_version objv, ep_objv, *pobjv = nullptr;
+
+  if (! store->is_meta_master()) {
+    JSONParser jp;
+    ceph::bufferlist in_data;
+    op_ret = forward_request_to_master(s, nullptr, store, in_data, &jp);
+    if (op_ret < 0) {
+      return op_ret;
+    }
+
+    JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
+    JSONDecoder::decode_json("object_ver", objv, &jp);
+    JSONDecoder::decode_json("bucket_info", master_info, &jp);
+
+    ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver="
+                      << objv.ver << dendl;
+    ldout(s->cct, 20) << "got creation_time="<< master_info.creation_time
+                      << dendl;
+
+    pmaster_bucket= &master_info.bucket;
+    creation_time = master_info.creation_time;
+    pobjv = &objv;
+  } else {
+    pmaster_bucket = nullptr;
+  }
+
+
+  std::string placement_rule;
+  if (bucket_exists) {
+    std::string selected_placement_rule;
+    rgw_bucket bucket;
+    bucket.tenant = s->bucket_tenant;
+    bucket.name = s->bucket_name;
+    op_ret = store->select_bucket_placement(*(s->user),
+                                            store->get_zonegroup().get_id(),
+                                            placement_rule,
+                                            bucket,
+                                            &selected_placement_rule,
+                                            nullptr);
+    if (selected_placement_rule != binfo.placement_rule) {
+      op_ret = -EEXIST;
+      ldout(s->cct, 20) << "bulk upload: non-coherent placement rule" << dendl;
+      return op_ret;
+    }
+  }
+
+  /* Create metadata: ACLs. */
+  std::map<std::string, ceph::bufferlist> attrs;
+  RGWAccessControlPolicy policy;
+  policy.create_default(s->user->user_id, s->user->display_name);
+  ceph::bufferlist aclbl;
+  policy.encode(aclbl);
+  attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
+
+  RGWQuotaInfo quota_info;
+  const RGWQuotaInfo * pquota_info = nullptr;
+
+  rgw_bucket bucket;
+  bucket.tenant = s->bucket_tenant; /* ignored if bucket exists */
+  bucket.name = bucket_name;
+
+
+  RGWBucketInfo out_info;
+  op_ret = store->create_bucket(*(s->user),
+                                bucket,
+                                store->get_zonegroup().get_id(),
+                                placement_rule, binfo.swift_ver_location,
+                                pquota_info, attrs,
+                                out_info, pobjv, &ep_objv, creation_time,
+                                pmaster_bucket, true);
+  /* continue if EEXIST and create_bucket will fail below.  this way we can
+   * recover from a partial create by retrying it. */
+  ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << op_ret
+                    << ", bucket=" << bucket << dendl;
+
+  if (op_ret && op_ret != -EEXIST) {
+    return op_ret;
+  }
+
+  const bool existed = (op_ret == -EEXIST);
+  if (existed) {
+    /* bucket already existed, might have raced with another bucket creation, or
+     * might be partial bucket creation that never completed. Read existing bucket
+     * info, verify that the reported bucket owner is the current user.
+     * If all is ok then update the user's list of buckets.
+     * Otherwise inform client about a name conflict.
+     */
+    if (out_info.owner.compare(s->user->user_id) != 0) {
+      op_ret = -EEXIST;
+      ldout(s->cct, 20) << "bulk upload: conflicting bucket name" << dendl;
+      return op_ret;
+    }
+    bucket = out_info.bucket;
+  }
+
+  op_ret = rgw_link_bucket(store, s->user->user_id, bucket,
+                           out_info.creation_time, false);
+  if (op_ret && !existed && op_ret != -EEXIST) {
+    /* if it exists (or previously existed), don't remove it! */
+    op_ret = rgw_unlink_bucket(store, s->user->user_id,
+                               bucket.tenant, bucket.name);
+    if (op_ret < 0) {
+      ldout(s->cct, 0) << "bulk upload: WARNING: failed to unlink bucket: ret="
+                       << op_ret << dendl;
+    }
+  } else if (op_ret == -EEXIST || (op_ret == 0 && existed)) {
+    ldout(s->cct, 20) << "bulk upload: containers already exists"
+                      << dendl;
+    op_ret = -ERR_BUCKET_EXISTS;
+  }
+
+  return op_ret;
+}
+
 void RGWBulkUploadOp::execute()
 {
   ceph::bufferlist buffer(64 * 1024);
@@ -5564,6 +5778,8 @@ void RGWBulkUploadOp::execute()
         }
         case rgw::tar::FileType::DIRECTORY: {
           ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl;
+
+          op_ret = handle_dir(header->get_filename());
           break;
         }
         default: {
index c17c4dbe3862da2568046c9fdef97de68314ea93..c671f85072f4445c844f22502aba7d63fb8afd8a 100644 (file)
@@ -395,6 +395,11 @@ protected:
   virtual std::unique_ptr<StreamGetter> create_stream() = 0;
   virtual void send_response() = 0;
 
+  boost::optional<std::pair<std::string, rgw_obj_key>>
+  parse_path(const boost::string_ref& path);
+
+  int handle_dir_verify_permission();
+  int handle_dir(boost::string_ref path);
 public:
   int verify_permission() override;
   void pre_exec() override;