]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW Zipper - Remove link/unlink from API 41108/head
authorDaniel Gryniewicz <dang@redhat.com>
Tue, 20 Apr 2021 18:07:11 +0000 (14:07 -0400)
committerDaniel Gryniewicz <dang@redhat.com>
Thu, 6 May 2021 13:41:32 +0000 (09:41 -0400)
link() and unlink() are an artifact of the representation of buckets and
users in RADOS, and don't need to be in the API.  Instead, fix create()
and chown() to behave properly.

Signed-off-by: Daniel Gryniewicz <dang@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_op.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_rados.cc
src/rgw/rgw_sal_rados.h
src/rgw/rgw_user.cc

index 5ab35c5e1a892aa3642958925dcb0fcdb9673281..7df17043bdd535a5282447a43dbff3b54cf04b9c 100644 (file)
@@ -283,7 +283,7 @@ void check_bad_user_bucket_mapping(rgw::sal::Store* store, rgw::sal::User* user,
         cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
         if (fix) {
           cout << "fixing" << std::endl;
-         r = actual_bucket->link(dpp, user, null_yield);
+         r = actual_bucket->chown(dpp, user, nullptr, null_yield);
           if (r < 0) {
             cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
           }
@@ -537,131 +537,6 @@ bool rgw_find_bucket_by_id(const DoutPrefixProvider *dpp, CephContext *cct, rgw:
   return false;
 }
 
-int RGWBucket::link(RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp,
-                    std::string *err_msg)
-{
-  if (!op_state.is_user_op()) {
-    set_err_msg(err_msg, "empty user id");
-    return -EINVAL;
-  }
-
-  string bucket_id = op_state.get_bucket_id();
-  std::string display_name = op_state.get_user_display_name();
-  std::unique_ptr<rgw::sal::Bucket> loc_bucket;
-  std::unique_ptr<rgw::sal::Bucket> old_bucket;
-
-  loc_bucket = op_state.get_bucket()->clone();
-
-  if (!bucket_id.empty() && bucket_id != loc_bucket->get_bucket_id()) {
-    set_err_msg(err_msg,
-       "specified bucket id does not match " + loc_bucket->get_bucket_id());
-    return -EINVAL;
-  }
-
-  old_bucket = loc_bucket->clone();
-
-  loc_bucket->get_key().tenant = op_state.get_user_id().tenant;
-
-  if (!op_state.new_bucket_name.empty()) {
-    auto pos = op_state.new_bucket_name.find('/');
-    if (pos != string::npos) {
-      loc_bucket->get_key().tenant = op_state.new_bucket_name.substr(0, pos);
-      loc_bucket->get_key().name = op_state.new_bucket_name.substr(pos + 1);
-    } else {
-      loc_bucket->get_key().name = op_state.new_bucket_name;
-    }
-  }
-
-  RGWObjVersionTracker objv_tracker;
-  RGWObjVersionTracker old_version = loc_bucket->get_info().objv_tracker;
-
-  map<string, bufferlist>::iterator aiter = loc_bucket->get_attrs().find(RGW_ATTR_ACL);
-  if (aiter == loc_bucket->get_attrs().end()) {
-       // should never happen; only pre-argonaut buckets lacked this.
-    ldpp_dout(dpp, 0) << "WARNING: can't bucket link because no acl on bucket=" << old_bucket << dendl;
-    set_err_msg(err_msg,
-       "While crossing the Anavros you have displeased the goddess Hera."
-       "  You must sacrifice your ancient bucket " + loc_bucket->get_bucket_id());
-    return -EINVAL;
-  }
-  bufferlist& aclbl = aiter->second;
-  RGWAccessControlPolicy policy;
-  ACLOwner owner;
-  try {
-   auto iter = aclbl.cbegin();
-   decode(policy, iter);
-   owner = policy.get_owner();
-  } catch (buffer::error& err) {
-    set_err_msg(err_msg, "couldn't decode policy");
-    return -EIO;
-  }
-
-  std::unique_ptr<rgw::sal::User> owner_user = store->get_user(owner.get_id());
-
-  int r = old_bucket->unlink(dpp, owner_user.get(), y, false);
-  if (r < 0) {
-    set_err_msg(err_msg, "could not unlink policy from user " + owner_user->get_id().to_str());
-    return r;
-  }
-
-  // now update the user for the bucket...
-  if (display_name.empty()) {
-    ldpp_dout(dpp, 0) << "WARNING: user " << user << " has no display name set" << dendl;
-  }
-
-  RGWAccessControlPolicy policy_instance;
-  policy_instance.create_default(user->get_id(), display_name);
-  owner = policy_instance.get_owner();
-
-  aclbl.clear();
-  policy_instance.encode(aclbl);
-
-  bool exclusive = false;
-  loc_bucket->get_info().owner = user->get_id();
-  if (*loc_bucket != *old_bucket) {
-  ldpp_dout(dpp, 20) << "Lando 10" << dendl;
-    loc_bucket->get_info().bucket = loc_bucket->get_key();
-    loc_bucket->get_info().objv_tracker.version_for_read()->ver = 0;
-    exclusive = true;
-  }
-
-  ldpp_dout(dpp, 20) << "Lando 11" << dendl;
-  r = loc_bucket->put_instance_info(dpp, exclusive, ceph::real_time());
-  ldpp_dout(dpp, 20) << "Lando 12 r=" << r << dendl;
-  if (r < 0) {
-    set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
-    return r;
-  }
-
-  RGWObjVersionTracker objv;
-
-  /* link to user */
-  ldpp_dout(dpp, 20) << "Lando 13" << dendl;
-  r = loc_bucket->link(dpp, user.get(), y, true, &objv);
-  ldpp_dout(dpp, 20) << "Lando 14 r=" << r << dendl;
-  if (r < 0) {
-    set_err_msg(err_msg, "failed to relink bucket");
-    return r;
-  }
-
-  if (*loc_bucket != *old_bucket) {
-    // like RGWRados::delete_bucket -- excepting no bucket_index work.
-    r = old_bucket->remove_entrypoint(dpp, &objv, y);
-    if (r < 0) {
-      set_err_msg(err_msg, "failed to unlink old bucket endpoint " + old_bucket->get_tenant() + "/" + old_bucket->get_name());
-      return r;
-    }
-
-    r = old_bucket->remove_instance_info(dpp, &old_version, y);
-    if (r < 0) {
-      set_err_msg(err_msg, "failed to unlink old bucket info");
-      return r;
-    }
-  }
-
-  return 0;
-}
-
 int RGWBucket::chown(RGWBucketAdminOpState& op_state, const string& marker,
                      optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg)
 {
@@ -673,22 +548,6 @@ int RGWBucket::chown(RGWBucketAdminOpState& op_state, const string& marker,
   return ret;
 }
 
-int RGWBucket::unlink(RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg)
-{
-  bucket = op_state.get_bucket()->clone();
-  if (!op_state.is_user_op()) {
-    set_err_msg(err_msg, "could not fetch user or user bucket info");
-    return -EINVAL;
-  }
-
-  int r = bucket->unlink(dpp, user.get(), y);
-  if (r < 0) {
-    set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
-  }
-
-  return r;
-}
-
 int RGWBucket::set_quota(RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, std::string *err_msg)
 {
   bucket = op_state.get_bucket()->clone();
@@ -1095,18 +954,135 @@ int RGWBucketAdminOp::unlink(rgw::sal::Store* store, RGWBucketAdminOpState& op_s
   if (ret < 0)
     return ret;
 
-  return bucket.unlink(op_state, null_yield, dpp);
+  return static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->unlink_bucket(op_state.get_user_id(), op_state.get_bucket()->get_info().bucket, null_yield, dpp, true);
 }
 
 int RGWBucketAdminOp::link(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, string *err)
 {
+  if (!op_state.is_user_op()) {
+    set_err_msg(err, "empty user id");
+    return -EINVAL;
+  }
+
   RGWBucket bucket;
   int ret = bucket.init(store, op_state, null_yield, dpp, err);
   if (ret < 0)
     return ret;
 
-  return bucket.link(op_state, null_yield, dpp, err);
+  string bucket_id = op_state.get_bucket_id();
+  std::string display_name = op_state.get_user_display_name();
+  std::unique_ptr<rgw::sal::Bucket> loc_bucket;
+  std::unique_ptr<rgw::sal::Bucket> old_bucket;
 
+  loc_bucket = op_state.get_bucket()->clone();
+
+  if (!bucket_id.empty() && bucket_id != loc_bucket->get_bucket_id()) {
+    set_err_msg(err,
+       "specified bucket id does not match " + loc_bucket->get_bucket_id());
+    return -EINVAL;
+  }
+
+  old_bucket = loc_bucket->clone();
+
+  loc_bucket->get_key().tenant = op_state.get_user_id().tenant;
+
+  if (!op_state.new_bucket_name.empty()) {
+    auto pos = op_state.new_bucket_name.find('/');
+    if (pos != string::npos) {
+      loc_bucket->get_key().tenant = op_state.new_bucket_name.substr(0, pos);
+      loc_bucket->get_key().name = op_state.new_bucket_name.substr(pos + 1);
+    } else {
+      loc_bucket->get_key().name = op_state.new_bucket_name;
+    }
+  }
+
+  RGWObjVersionTracker objv_tracker;
+  RGWObjVersionTracker old_version = loc_bucket->get_info().objv_tracker;
+
+  map<string, bufferlist>::iterator aiter = loc_bucket->get_attrs().find(RGW_ATTR_ACL);
+  if (aiter == loc_bucket->get_attrs().end()) {
+       // should never happen; only pre-argonaut buckets lacked this.
+    ldpp_dout(dpp, 0) << "WARNING: can't bucket link because no acl on bucket=" << old_bucket << dendl;
+    set_err_msg(err,
+       "While crossing the Anavros you have displeased the goddess Hera."
+       "  You must sacrifice your ancient bucket " + loc_bucket->get_bucket_id());
+    return -EINVAL;
+  }
+  bufferlist& aclbl = aiter->second;
+  RGWAccessControlPolicy policy;
+  ACLOwner owner;
+  try {
+   auto iter = aclbl.cbegin();
+   decode(policy, iter);
+   owner = policy.get_owner();
+  } catch (buffer::error& e) {
+    set_err_msg(err, "couldn't decode policy");
+    return -EIO;
+  }
+
+  int r = static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->unlink_bucket(owner.get_id(), old_bucket->get_info().bucket, null_yield, dpp, false);
+  if (r < 0) {
+    set_err_msg(err, "could not unlink policy from user " + owner.get_id().to_str());
+    return r;
+  }
+
+  // now update the user for the bucket...
+  if (display_name.empty()) {
+    ldpp_dout(dpp, 0) << "WARNING: user " << op_state.get_user_id() << " has no display name set" << dendl;
+  }
+
+  RGWAccessControlPolicy policy_instance;
+  policy_instance.create_default(op_state.get_user_id(), display_name);
+  owner = policy_instance.get_owner();
+
+  aclbl.clear();
+  policy_instance.encode(aclbl);
+
+  bool exclusive = false;
+  loc_bucket->get_info().owner = op_state.get_user_id();
+  if (*loc_bucket != *old_bucket) {
+    loc_bucket->get_info().bucket = loc_bucket->get_key();
+    loc_bucket->get_info().objv_tracker.version_for_read()->ver = 0;
+    exclusive = true;
+  }
+
+  r = loc_bucket->put_instance_info(dpp, exclusive, ceph::real_time());
+  if (r < 0) {
+    set_err_msg(err, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
+    return r;
+  }
+
+  /* link to user */
+  RGWBucketEntryPoint ep;
+  ep.bucket = loc_bucket->get_info().bucket;
+  ep.owner = op_state.get_user_id();
+  ep.creation_time = loc_bucket->get_info().creation_time;
+  ep.linked = true;
+  rgw::sal::Attrs ep_attrs;
+  rgw_ep_info ep_data{ep, ep_attrs};
+
+  r = static_cast<rgw::sal::RadosStore*>(store)->ctl()->bucket->link_bucket(op_state.get_user_id(), loc_bucket->get_info().bucket, loc_bucket->get_info().creation_time, null_yield, dpp, true, &ep_data);
+  if (r < 0) {
+    set_err_msg(err, "failed to relink bucket");
+    return r;
+  }
+
+  if (*loc_bucket != *old_bucket) {
+    // like RGWRados::delete_bucket -- excepting no bucket_index work.
+    r = old_bucket->remove_entrypoint(dpp, &ep_data.ep_objv, null_yield);
+    if (r < 0) {
+      set_err_msg(err, "failed to unlink old bucket endpoint " + old_bucket->get_tenant() + "/" + old_bucket->get_name());
+      return r;
+    }
+
+    r = old_bucket->remove_instance_info(dpp, &old_version, null_yield);
+    if (r < 0) {
+      set_err_msg(err, "failed to unlink old bucket info");
+      return r;
+    }
+  }
+
+  return 0;
 }
 
 int RGWBucketAdminOp::chown(rgw::sal::Store* store, RGWBucketAdminOpState& op_state, const string& marker, const DoutPrefixProvider *dpp, string *err)
@@ -1117,10 +1093,6 @@ int RGWBucketAdminOp::chown(rgw::sal::Store* store, RGWBucketAdminOpState& op_st
   if (ret < 0)
     return ret;
 
-  ret = bucket.link(op_state, null_yield, dpp, err);
-  if (ret < 0)
-    return ret;
-
   return bucket.chown(op_state, marker, null_yield, dpp, err);
 
 }
index ae0df7f6a1f1e6ef5df3fb46f309ec65bfe6e07a..36df4a52499688337eb49be03c9241d2a5c37061 100644 (file)
@@ -349,11 +349,8 @@ public:
           map<RGWObjCategory, RGWStorageStats>& calculated_stats,
           std::string *err_msg = NULL);
 
-  int link(RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp,
-           std::string *err_msg = NULL);
   int chown(RGWBucketAdminOpState& op_state, const string& marker,
             optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg = NULL);
-  int unlink(RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg = NULL);
   int set_quota(RGWBucketAdminOpState& op_state, const DoutPrefixProvider *dpp, std::string *err_msg = NULL);
 
   int remove_object(const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state, std::string *err_msg = NULL);
index bbfdc749884168997ba40923740de2f5217b4098..f39645fdde61dcaa01bf137be2db6ebf707d57d9 100644 (file)
@@ -3162,7 +3162,7 @@ void RGWCreateBucket::execute(optional_yield y)
 
   /* We're replacing bucket with the newly created one */
   ldpp_dout(this, 10) << "user=" << s->user << " bucket=" << tmp_bucket << dendl;
-  op_ret = store->create_bucket(this, *s->user, tmp_bucket, zonegroup_id,
+  op_ret = store->create_bucket(this, s->user.get(), tmp_bucket, zonegroup_id,
                                placement_rule,
                                info.swift_ver_location,
                                pquota_info, policy, attrs, info, ep_objv,
@@ -3177,31 +3177,6 @@ void RGWCreateBucket::execute(optional_yield y)
     return;
 
   const bool existed = s->bucket_exists;
-  if (existed) {
-    /* bucket already existed, might have raced with another bucket creation, or
-     * might be partial bucket creation that never completed. Read existing bucket
-     * info, verify that the reported bucket owner is the current user.
-     * If all is ok then update the user's list of buckets.
-     * Otherwise inform client about a name conflict.
-     */
-    if (s->bucket->get_info().owner.compare(s->user->get_id()) != 0) {
-      op_ret = -EEXIST;
-      return;
-    }
-  }
-
-  op_ret = s->bucket->link(this, s->user.get(), y, false);
-  if (op_ret && !existed && op_ret != -EEXIST) {
-    /* if it exists (or previously existed), don't remove it! */
-    op_ret = s->bucket->unlink(this, s->user.get(), y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
-                      << dendl;
-    }
-  } else if (op_ret == -EEXIST || (op_ret == 0 && existed)) {
-    op_ret = -ERR_BUCKET_EXISTS;
-  }
-
   if (need_metadata_upload() && existed) {
     /* OK, it looks we lost race with another request. As it's required to
      * handle metadata fusion and upload, the whole operation becomes very
@@ -6911,7 +6886,7 @@ int RGWBulkUploadOp::handle_dir(const std::string_view path, optional_yield y)
   placement_rule.storage_class = s->info.storage_class;
   forward_req_info(s->cct, info, bucket_name);
 
-  op_ret = store->create_bucket(this, *s->user, new_bucket,
+  op_ret = store->create_bucket(this, s->user.get(), new_bucket,
                                 store->get_zone()->get_zonegroup().get_id(),
                                 placement_rule, swift_ver_location,
                                 pquota_info, policy, attrs,
@@ -6923,37 +6898,6 @@ int RGWBulkUploadOp::handle_dir(const std::string_view path, optional_yield y)
   ldpp_dout(this, 20) << "rgw_create_bucket returned ret=" << op_ret
       << ", bucket=" << bucket << dendl;
 
-  if (op_ret && op_ret != -EEXIST) {
-    return op_ret;
-  }
-
-  const bool existed = (op_ret == -EEXIST);
-  if (existed) {
-    /* bucket already existed, might have raced with another bucket creation, or
-     * might be partial bucket creation that never completed. Read existing bucket
-     * info, verify that the reported bucket owner is the current user.
-     * If all is ok then update the user's list of buckets.
-     * Otherwise inform client about a name conflict.
-     */
-    if (out_info.owner.compare(s->user->get_id()) != 0) {
-      op_ret = -EEXIST;
-      ldpp_dout(this, 20) << "conflicting bucket name" << dendl;
-      return op_ret;
-    }
-  }
-
-  op_ret = bucket->link(this, s->user.get(), y, false);
-  if (op_ret && !existed && op_ret != -EEXIST) {
-    /* if it exists (or previously existed), don't remove it! */
-    op_ret = bucket->unlink(this, s->user.get(), y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret << dendl;
-    }
-  } else if (op_ret == -EEXIST || (op_ret == 0 && existed)) {
-    ldpp_dout(this, 20) << "containers already exists" << dendl;
-    op_ret = -ERR_BUCKET_EXISTS;
-  }
-
   return op_ret;
 }
 
index 437d1d771af977f70d93b9303e870c1373b4fb44..5901b4749eb09474532b237897952719da24bc2d 100644 (file)
@@ -143,7 +143,7 @@ class Store {
     virtual int get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr<Bucket>* bucket) = 0;
     virtual int get_bucket(const DoutPrefixProvider* dpp, User* u, const std::string& tenant, const std::string& name, std::unique_ptr<Bucket>* bucket, optional_yield y) = 0;
     virtual int create_bucket(const DoutPrefixProvider* dpp,
-                            User& u, const rgw_bucket& b,
+                            User* u, const rgw_bucket& b,
                             const std::string& zonegroup_id,
                             rgw_placement_rule& placement_rule,
                             std::string& swift_ver_location,
@@ -390,8 +390,6 @@ class Bucket {
     virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) = 0;
     virtual int update_container_stats(const DoutPrefixProvider* dpp) = 0;
     virtual int check_bucket_shards(const DoutPrefixProvider* dpp) = 0;
-    virtual int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr) = 0;
-    virtual int unlink(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true) = 0;
     virtual int chown(const DoutPrefixProvider* dpp, User* new_user, User* old_user, optional_yield y, const std::string* marker = nullptr) = 0;
     virtual int put_instance_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time mtime) = 0;
     virtual int remove_entrypoint(const DoutPrefixProvider* dpp, RGWObjVersionTracker* objv, optional_yield y) = 0;
index 8c1326fc6971328d232c9308221b3ad44bf9e52f..ed730b0a633072d391856cc100cca1635615d16f 100644 (file)
@@ -68,7 +68,7 @@ static int decode_policy(CephContext* cct,
 
 static int rgw_op_get_bucket_policy_from_attr(const DoutPrefixProvider* dpp,
                                              RadosStore* store,
-                                             User& user,
+                                             User* user,
                                              Attrs& bucket_attrs,
                                              RGWAccessControlPolicy* policy,
                                              optional_yield y)
@@ -82,11 +82,11 @@ static int rgw_op_get_bucket_policy_from_attr(const DoutPrefixProvider* dpp,
   } else {
     ldout(store->ctx(), 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl;
     /* object exists, but policy is broken */
-    int r = user.load_user(dpp, y);
+    int r = user->load_user(dpp, y);
     if (r < 0)
       return r;
 
-    policy->create_default(user.get_id(), user.get_display_name());
+    policy->create_default(user->get_id(), user->get_display_name());
   }
   return 0;
 }
@@ -428,6 +428,14 @@ int RadosBucket::chown(const DoutPrefixProvider* dpp, User* new_user, User* old_
   if (marker == nullptr)
     marker = &obj_marker;
 
+  int r = this->link(dpp, new_user, y);
+  if (r < 0) {
+    return r;
+  }
+  if (!old_user) {
+    return r;
+  }
+
   return store->ctl()->bucket->chown(store, this, new_user->get_id(),
                           old_user->get_display_name(), *marker, y, dpp);
 }
@@ -701,7 +709,7 @@ int RadosStore::get_bucket(const DoutPrefixProvider* dpp, User* u, const std::st
 }
 
 int RadosStore::create_bucket(const DoutPrefixProvider* dpp,
-                                User& u, const rgw_bucket& b,
+                                User* u, const rgw_bucket& b,
                                 const std::string& zonegroup_id,
                                 rgw_placement_rule& placement_rule,
                                 std::string& swift_ver_location,
@@ -727,7 +735,7 @@ int RadosStore::create_bucket(const DoutPrefixProvider* dpp,
   obj_version objv,* pobjv = NULL;
 
   /* If it exists, look it up; otherwise create it */
-  ret = get_bucket(dpp, &u, b, &bucket, y);
+  ret = get_bucket(dpp, u, b, &bucket, y);
   if (ret < 0 && ret != -ENOENT)
     return ret;
 
@@ -747,14 +755,14 @@ int RadosStore::create_bucket(const DoutPrefixProvider* dpp,
       return -EEXIST;
     }
   } else {
-    bucket = std::unique_ptr<Bucket>(new RadosBucket(this, b, &u));
+    bucket = std::unique_ptr<Bucket>(new RadosBucket(this, b, u));
     *existed = false;
     bucket->set_attrs(attrs);
   }
 
   if (!svc()->zone->is_meta_master()) {
     JSONParser jp;
-    ret = forward_request_to_master(dpp, &u, NULL, in_data, &jp, req_info, y);
+    ret = forward_request_to_master(dpp, u, NULL, in_data, &jp, req_info, y);
     if (ret < 0) {
       return ret;
     }
@@ -786,7 +794,7 @@ int RadosStore::create_bucket(const DoutPrefixProvider* dpp,
 
   if (*existed) {
     rgw_placement_rule selected_placement_rule;
-    ret = svc()->zone->select_bucket_placement(dpp, u.get_info(),
+    ret = svc()->zone->select_bucket_placement(dpp, u->get_info(),
                                               zid, placement_rule,
                                               &selected_placement_rule, nullptr, y);
     if (selected_placement_rule != info.placement_rule) {
@@ -796,13 +804,22 @@ int RadosStore::create_bucket(const DoutPrefixProvider* dpp,
     }
   } else {
 
-    ret = getRados()->create_bucket(u.get_info(), bucket->get_key(),
+    ret = getRados()->create_bucket(u->get_info(), bucket->get_key(),
                                    zid, placement_rule, swift_ver_location, pquota_info,
                                    attrs, info, pobjv, &ep_objv, creation_time,
                                    pmaster_bucket, pmaster_num_shards, y, dpp,
                                    exclusive);
     if (ret == -EEXIST) {
       *existed = true;
+      /* bucket already existed, might have raced with another bucket creation,
+       * or might be partial bucket creation that never completed. Read existing
+       * bucket info, verify that the reported bucket owner is the current user.
+       * If all is ok then update the user's list of buckets.  Otherwise inform
+       * client about a name conflict.
+       */
+      if (info.owner.compare(u->get_id()) != 0) {
+       return -EEXIST;
+      }
       ret = 0;
     } else if (ret != 0) {
       return ret;
@@ -812,6 +829,19 @@ int RadosStore::create_bucket(const DoutPrefixProvider* dpp,
   bucket->set_version(ep_objv);
   bucket->get_info() = info;
 
+  RadosBucket* rbucket = static_cast<RadosBucket*>(bucket.get());
+  ret = rbucket->link(dpp, u, y, false);
+  if (ret && !*existed && ret != -EEXIST) {
+    /* if it exists (or previously existed), don't remove it! */
+    ret = rbucket->unlink(dpp, u, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << ret
+                      << dendl;
+    }
+  } else if (ret == -EEXIST || (ret == 0 && *existed)) {
+    ret = -ERR_BUCKET_EXISTS;
+  }
+
   bucket_out->swap(bucket);
 
   return ret;
index bc79ec83cbf2ce8c3462c54fee5ccbd9c0478ad6..0f1814e92b56958b3c706cec62e9681cc53e76f6 100644 (file)
@@ -304,8 +304,6 @@ class RadosBucket : public Bucket {
     virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
     virtual int update_container_stats(const DoutPrefixProvider* dpp) override;
     virtual int check_bucket_shards(const DoutPrefixProvider* dpp) override;
-    virtual int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint, RGWObjVersionTracker* objv) override;
-    virtual int unlink(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true) override;
     virtual int chown(const DoutPrefixProvider* dpp, User* new_user, User* old_user, optional_yield y, const std::string* marker = nullptr) override;
     virtual int put_instance_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time mtime) override;
     virtual int remove_entrypoint(const DoutPrefixProvider* dpp, RGWObjVersionTracker* objv, optional_yield y) override;
@@ -328,6 +326,9 @@ class RadosBucket : public Bucket {
       return std::make_unique<RadosBucket>(*this);
     }
 
+  private:
+    int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);
+    int unlink(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true);
     friend class RadosStore;
 };
 
@@ -374,7 +375,7 @@ class RadosStore : public Store {
     virtual int get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr<Bucket>* bucket) override;
     virtual int get_bucket(const DoutPrefixProvider* dpp, User* u, const std::string& tenant, const std::string&name, std::unique_ptr<Bucket>* bucket, optional_yield y) override;
     virtual int create_bucket(const DoutPrefixProvider* dpp,
-                            User& u, const rgw_bucket& b,
+                            User* u, const rgw_bucket& b,
                             const std::string& zonegroup_id,
                             rgw_placement_rule& placement_rule,
                             std::string& swift_ver_location,
index 8f5f1e1055fd83fc123d28592fcecdc6c94b246b..746f89ceda91c3b862e1ca90fc4e6bc4864680e0 100644 (file)
@@ -1708,12 +1708,6 @@ int RGWUser::execute_rename(const DoutPrefixProvider *dpp, RGWUserAdminOpState&
         return ret;
       }
 
-      ret = bucket->link(dpp, new_user.get(), y);
-      if (ret < 0) {
-        set_err_msg(err_msg, "failed to link bucket " + bucket->get_name());
-        return ret;
-      }
-
       ret = bucket->chown(dpp, new_user.get(), old_user.get(), y);
       if (ret < 0) {
         set_err_msg(err_msg, "failed to run bucket chown" + cpp_strerror(-ret));