]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Zipper - Implement Delete op on RGWObject
authorDaniel Gryniewicz <dang@redhat.com>
Thu, 23 Jul 2020 17:28:42 +0000 (13:28 -0400)
committerDaniel Gryniewicz <dang@redhat.com>
Mon, 17 Aug 2020 16:30:58 +0000 (12:30 -0400)
Signed-off-by: Daniel Gryniewicz <dang@redhat.com>
src/rgw/rgw_op.cc
src/rgw/rgw_sal.cc
src/rgw/rgw_sal.h
src/rgw/rgw_user.cc

index 60afb885fe16791da48db2d2fe05a073fdb77e39..aa9fa9f07944cf13612cf855dbdeb9b5e9566342 100644 (file)
@@ -3432,7 +3432,7 @@ void RGWDeleteBucket::execute()
     }
   }
 
-  op_ret = s->bucket->remove_bucket(false, prefix, delimiter, s->yield);
+  op_ret = s->bucket->remove_bucket(false, prefix, delimiter, false, nullptr, s->yield);
 
   if (op_ret < 0 && op_ret == -ECANCELED) {
       // lost a race, either with mdlog sync or another delete bucket operation.
@@ -4791,28 +4791,20 @@ void RGWDeleteObj::execute()
     }
 
     if (!ver_restored) {
+      uint64_t epoch;
+
       /* Swift's versioning mechanism hasn't found any previous version of
        * the object that could be restored. This means we should proceed
        * with the regular delete path. */
-      RGWRados::Object del_target(store->getRados(), s->bucket->get_info(), *obj_ctx, s->object->get_obj());
-      RGWRados::Object::Delete del_op(&del_target);
-
-      op_ret = get_system_versioning_params(s, &del_op.params.olh_epoch,
-                                            &del_op.params.marker_version_id);
+      op_ret = get_system_versioning_params(s, &epoch, &version_id);
       if (op_ret < 0) {
-        return;
+       return;
       }
 
-      del_op.params.bucket_owner = s->bucket_owner.get_id();
-      del_op.params.versioning_status = s->bucket->get_info().versioning_status();
-      del_op.params.obj_owner = s->owner;
-      del_op.params.unmod_since = unmod_since;
-      del_op.params.high_precision_time = s->system_request; /* system request uses high precision time */
-
-      op_ret = del_op.delete_obj(s->yield);
+      op_ret = s->object->delete_object(obj_ctx, s->owner, s->bucket_owner, unmod_since,
+                                       s->system_request, epoch, version_id, s->yield);
       if (op_ret >= 0) {
-        delete_marker = del_op.result.delete_marker;
-        version_id = del_op.result.version_id;
+       delete_marker = s->object->get_delete_marker();
       }
 
       /* Check whether the object has expired. Swift API documentation
@@ -6473,14 +6465,15 @@ void RGWDeleteMultiObj::execute()
   for (iter = multi_delete->objects.begin();
         iter != multi_delete->objects.end();
         ++iter) {
-    rgw::sal::RGWRadosObject obj(store, *iter, bucket);
+    std::string version_id;
+    std::unique_ptr<rgw::sal::RGWObject> obj = bucket->get_object(*iter);
     if (s->iam_policy || ! s->iam_user_policies.empty()) {
       auto usr_policy_res = eval_user_policies(s->iam_user_policies, s->env,
                                               boost::none,
                                               iter->instance.empty() ?
                                               rgw::IAM::s3DeleteObject :
                                               rgw::IAM::s3DeleteObjectVersion,
-                                              ARN(obj.get_obj()));
+                                              ARN(obj->get_obj()));
       if (usr_policy_res == Effect::Deny) {
         send_partial_response(*iter, false, "", -EACCES);
         continue;
@@ -6493,7 +6486,7 @@ void RGWDeleteMultiObj::execute()
                                   iter->instance.empty() ?
                                   rgw::IAM::s3DeleteObject :
                                   rgw::IAM::s3DeleteObjectVersion,
-                                  ARN(obj.get_obj()));
+                                  ARN(obj->get_obj()));
       }
       if ((e == Effect::Deny) ||
          (usr_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
@@ -6502,30 +6495,23 @@ void RGWDeleteMultiObj::execute()
       }
     }
 
-    obj.set_atomic(obj_ctx);
-
-    RGWRados::Object del_target(store->getRados(), s->bucket->get_info(), *obj_ctx, obj.get_obj());
-    RGWRados::Object::Delete del_op(&del_target);
+    obj->set_atomic(obj_ctx);
 
-    del_op.params.bucket_owner = s->bucket_owner.get_id();
-    del_op.params.versioning_status = s->bucket->get_info().versioning_status();
-    del_op.params.obj_owner = s->owner;
-
-    op_ret = del_op.delete_obj(s->yield);
+    op_ret = obj->delete_object(obj_ctx, s->owner, s->bucket_owner, ceph::real_time(),
+                               false, 0, version_id, s->yield);
     if (op_ret == -ENOENT) {
       op_ret = 0;
     }
 
-    send_partial_response(*iter, del_op.result.delete_marker,
-                         del_op.result.version_id, op_ret);
+    send_partial_response(*iter, obj->get_delete_marker(), version_id, op_ret);
 
-    const auto obj_state = obj_ctx->get_state(obj.get_obj());
+    const auto obj_state = obj_ctx->get_state(obj->get_obj());
     bufferlist etag_bl;
     const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
-    obj.set_obj_size(obj_state->size);
+    obj->set_obj_size(obj_state->size);
 
-    const auto ret = rgw::notify::publish(s, &obj, ceph::real_clock::now(), etag,
-            del_op.result.delete_marker && obj.get_instance().empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+    const auto ret = rgw::notify::publish(s, obj.get(), ceph::real_clock::now(), etag,
+            obj->get_delete_marker() && s->object->get_instance().empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
             store);
     if (ret < 0) {
         ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
@@ -6572,68 +6558,42 @@ bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
 
 bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
 {
-  auto& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-
-  RGWBucketInfo binfo;
-  map<string, bufferlist> battrs;
+  std::unique_ptr<rgw::sal::RGWBucket> bucket;
   ACLOwner bowner;
   RGWObjVersionTracker ot;
 
-  rgw_bucket b(rgw_bucket_key(s->user->get_tenant(), path.bucket_name));
+  int ret = store->get_bucket(s->user, s->user->get_tenant(), path.bucket_name, &bucket);
+  if (ret < 0) {
+    goto binfo_fail;
+  }
 
-  int ret = store->ctl()->bucket->read_bucket_info(b, &binfo, s->yield,
-                                               RGWBucketCtl::BucketInstance::GetParams()
-                                                 .set_attrs(&battrs),
-                                               &ot);
+  ret = bucket->get_bucket_info(s->yield);
   if (ret < 0) {
     goto binfo_fail;
   }
 
-  if (!verify_permission(binfo, battrs, bowner)) {
+  if (!verify_permission(bucket->get_info(), bucket->get_attrs().attrs, bowner)) {
     ret = -EACCES;
     goto auth_fail;
   }
 
   if (!path.obj_key.empty()) {
-    rgw_obj obj(binfo.bucket, path.obj_key);
-    obj_ctx.set_atomic(obj);
+    ACLOwner bucket_owner;
+    std::string version_id;
 
-    RGWRados::Object del_target(store->getRados(), binfo, obj_ctx, obj);
-    RGWRados::Object::Delete del_op(&del_target);
+    bucket_owner.set_id(bucket->get_info().owner);
+    std::unique_ptr<rgw::sal::RGWObject> obj = bucket->get_object(path.obj_key);
+    obj->set_atomic(s->obj_ctx);
 
-    del_op.params.bucket_owner = binfo.owner;
-    del_op.params.versioning_status = binfo.versioning_status();
-    del_op.params.obj_owner = bowner;
-
-    ret = del_op.delete_obj(s->yield);
+    ret = obj->delete_object(s->obj_ctx, bowner, bucket_owner, ceph::real_time(), false, 0, version_id, s->yield);
     if (ret < 0) {
       goto delop_fail;
     }
   } else {
-    ret = store->getRados()->delete_bucket(binfo, ot, s->yield);
-    if (0 == ret) {
-      ret = store->ctl()->bucket->unlink_bucket(binfo.owner, binfo.bucket, s->yield, false);
-      if (ret < 0) {
-        ldpp_dout(s, 0) << "WARNING: failed to unlink bucket: ret=" << ret << dendl;
-      }
-    }
+    ret = bucket->remove_bucket(false, string(), string(), true, &s->info, s->yield);
     if (ret < 0) {
       goto delop_fail;
     }
-
-    if (!store->svc()->zone->is_meta_master()) {
-      bufferlist in_data;
-      ret = forward_request_to_master(s, &ot.read_version, store, in_data,
-                                      nullptr);
-      if (ret < 0) {
-        if (ret == -ENOENT) {
-          /* adjust error, we want to return with NoSuchBucket and not
-           * NoSuchKey */
-          ret = -ERR_NO_SUCH_BUCKET;
-        }
-        goto delop_fail;
-      }
-    }
   }
 
   num_deleted++;
index 9ca71f4745d0db47bec46a5b1d9fdcee87fc345f..33601689a2e1fca706e5b23a3dfafd1e4c7207c9 100644 (file)
@@ -81,7 +81,7 @@ RGWObject *RGWRadosBucket::create_object(const rgw_obj_key &key)
   return nullptr;
 }
 
-int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std::string delimiter, optional_yield y)
+int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y)
 {
   int ret;
 
@@ -120,9 +120,12 @@ int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std:
     }
   } while(is_truncated);
 
-  ret = abort_bucket_multiparts(store, store->ctx(), info, prefix, delimiter);
-  if (ret < 0) {
-    return ret;
+  /* If there's a prefix, then we are aborting multiparts as well */
+  if (!prefix.empty()) {
+    ret = abort_bucket_multiparts(store, store->ctx(), info, prefix, delimiter);
+    if (ret < 0) {
+      return ret;
+    }
   }
 
   ret = store->ctl()->bucket->sync_user_stats(info.owner, info);
@@ -130,11 +133,11 @@ int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std:
      ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" <<  ret << dendl;
   }
 
-  RGWObjVersionTracker objv_tracker;
+  RGWObjVersionTracker ot;
 
   // if we deleted children above we will force delete, as any that
   // remain is detrius from a prior bug
-  ret = store->getRados()->delete_bucket(info, objv_tracker, null_yield, !delete_children);
+  ret = store->getRados()->delete_bucket(info, ot, null_yield, !delete_children);
   if (ret < 0) {
     lderr(store->ctx()) << "ERROR: could not remove bucket " <<
       info.bucket.name << dendl;
@@ -146,6 +149,19 @@ int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std:
     lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
   }
 
+  if (forward_to_master && !store->svc()->zone->is_meta_master()) {
+    bufferlist in_data;
+    ret = store->forward_request_to_master(owner, &ot.read_version, in_data, nullptr, *req_info);
+    if (ret < 0) {
+      if (ret == -ENOENT) {
+       /* adjust error, we want to return with NoSuchBucket and not
+        * NoSuchKey */
+       ret = -ERR_NO_SUCH_BUCKET;
+      }
+      return ret;
+    }
+  }
+
   return ret;
 }
 
@@ -506,9 +522,9 @@ int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid,
   return cur_ioctx.omap_get_vals_by_keys(oid, keys, vals);
 }
 
-RGWObject::ReadOp* RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
+std::unique_ptr<RGWObject::ReadOp> RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
 {
-  return new RGWRadosObject::RadosReadOp(this, ctx);
+  return std::unique_ptr<RGWObject::ReadOp>(new RGWRadosObject::RadosReadOp(this, ctx));
 }
 
 RGWRadosObject::RadosReadOp::RadosReadOp(RGWRadosObject *_source, RGWObjectCtx *_rctx) :
@@ -552,6 +568,36 @@ int RGWRadosObject::RadosReadOp::read(int64_t ofs, int64_t end, bufferlist& bl,
   return parent_op.read(ofs, end, bl, y);
 }
 
+int RGWRadosObject::RadosReadOp::get_manifest(RGWObjManifest **pmanifest,
+                                             optional_yield y)
+{
+  return op_target.get_manifest(pmanifest, y);
+}
+
+int RGWRadosObject::delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner, ACLOwner bucket_owner, ceph::real_time unmod_since, bool high_precision_time, uint64_t epoch, string& version_id, optional_yield y)
+{
+  int ret = 0;
+  RGWRados::Object del_target(store->getRados(), bucket->get_info(), *obj_ctx, get_obj());
+  RGWRados::Object::Delete del_op(&del_target);
+
+  del_op.params.olh_epoch = epoch;
+  del_op.params.marker_version_id = version_id;
+  del_op.params.bucket_owner = bucket_owner.get_id();
+  del_op.params.versioning_status = bucket->get_info().versioning_status();
+  del_op.params.obj_owner = obj_owner;
+  del_op.params.unmod_since = unmod_since;
+  del_op.params.high_precision_time = high_precision_time;
+
+  ret = del_op.delete_obj(y);
+  if (ret >= 0) {
+    delete_marker = del_op.result.delete_marker;
+    version_id = del_op.result.version_id;
+  }
+
+  return ret;
+}
+
+
 int RGWRadosObject::RadosReadOp::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb, optional_yield y)
 {
   return parent_op.iterate(ofs, end, cb, y);
index ab405850e452357935ce98639f59baf410cf06cf..b5068244c318643114edfea413a1dd4f6fad3989 100644 (file)
@@ -160,7 +160,7 @@ class RGWBucket {
     virtual RGWObject* create_object(const rgw_obj_key& key /* Attributes */) = 0;
     virtual RGWAttrs& get_attrs(void) { return attrs; }
     virtual int set_attrs(RGWAttrs a) { attrs = a; return 0; }
-    virtual int remove_bucket(bool delete_children, std::string prefix, std::string delimiter, optional_yield y) = 0;
+    virtual int remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y) = 0;
     virtual RGWAccessControlPolicy& get_acl(void) = 0;
     virtual int set_acl(RGWAccessControlPolicy& acl, optional_yield y) = 0;
     virtual int get_bucket_info(optional_yield y) = 0;
@@ -274,6 +274,7 @@ class RGWObject {
     uint64_t obj_size;
     RGWAttrs attrs;
     ceph::real_time mtime;
+    bool delete_marker;
 
   public:
 
@@ -303,28 +304,34 @@ class RGWObject {
       index_hash_source(),
       obj_size(),
       attrs(),
-      mtime() {}
+      mtime(),
+      delete_marker(false) {}
     RGWObject(const rgw_obj_key& _k)
       : key(_k),
       bucket(),
       index_hash_source(),
       obj_size(),
       attrs(),
-      mtime() {}
+      mtime(),
+      delete_marker(false) {}
     RGWObject(const rgw_obj_key& _k, RGWBucket* _b)
       : key(_k),
       bucket(_b),
       index_hash_source(),
       obj_size(),
       attrs(),
-      mtime() {}
+      mtime(),
+      delete_marker(false) {}
     RGWObject(RGWObject& _o) = default;
 
     virtual ~RGWObject() = default;
 
     virtual int read(off_t offset, off_t length, std::iostream& stream) = 0;
     virtual int write(off_t offset, off_t length, std::iostream& stream) = 0;
-    virtual int delete_object(void) = 0;
+    virtual int delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner,
+                             ACLOwner bucket_owner, ceph::real_time unmod_since,
+                             bool high_precision_time, uint64_t epoch,
+                             string& version_id,optional_yield y) = 0;
     virtual RGWAccessControlPolicy& get_acl(void) = 0;
     virtual int set_acl(const RGWAccessControlPolicy& acl) = 0;
     virtual void set_atomic(RGWObjectCtx *rctx) const = 0;
@@ -349,10 +356,11 @@ class RGWObject {
     std::string get_hash_source(void) { return index_hash_source; }
     void set_hash_source(std::string s) { index_hash_source = s; }
     std::string get_oid(void) const { return key.get_oid(); }
+    bool get_delete_marker(void) { return delete_marker; }
     int range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end);
 
     /* OPs */
-    virtual ReadOp* get_read_op(RGWObjectCtx *) = 0;
+    virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx *) = 0;
 
     /* OMAP */
     virtual int omap_get_vals_by_keys(const std::string& oid,
@@ -421,6 +429,7 @@ class RGWRadosObject : public RGWObject {
     RGWAccessControlPolicy acls;
 
   public:
+
     struct RadosReadOp : public ReadOp {
     private:
       RGWRadosObject* source;
@@ -452,7 +461,10 @@ class RGWRadosObject : public RGWObject {
 
     int read(off_t offset, off_t length, std::iostream& stream) { return length; }
     int write(off_t offset, off_t length, std::iostream& stream) { return length; }
-    int delete_object(void) { return 0; }
+    virtual int delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner,
+                             ACLOwner bucket_owner, ceph::real_time unmod_since,
+                             bool high_precision_time, uint64_t epoch,
+                             string& version_id,optional_yield y) override;
     RGWAccessControlPolicy& get_acl(void) { return acls; }
     int set_acl(const RGWAccessControlPolicy& acl) { acls = acl; return 0; }
     virtual void set_atomic(RGWObjectCtx *rctx) const;
@@ -471,7 +483,7 @@ class RGWRadosObject : public RGWObject {
     }
 
     /* OPs */
-    virtual ReadOp* get_read_op(RGWObjectCtx *) override;
+    virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx *) override;
 
     /* OMAP */
     virtual int omap_get_vals_by_keys(const std::string& oid,
@@ -536,7 +548,7 @@ class RGWRadosBucket : public RGWBucket {
     RGWBucketList* list(void) { return new RGWBucketList(); }
     virtual int list(ListParams&, int, ListResults&, optional_yield y) override;
     RGWObject* create_object(const rgw_obj_key& key /* Attributes */) override;
-    virtual int remove_bucket(bool delete_children, std::string prefix, std::string delimiter, optional_yield y) override;
+    virtual int remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y) override;
     RGWAccessControlPolicy& get_acl(void) { return acls; }
     virtual int set_acl(RGWAccessControlPolicy& acl, optional_yield y) override;
     virtual int get_bucket_info(optional_yield y) override;
@@ -611,16 +623,14 @@ class RGWRadosStore : public RGWStore {
 
     int get_obj_head_ioctx(const RGWBucketInfo& bucket_info, const rgw_obj& obj,
                           librados::IoCtx *ioctx);
+    int forward_request_to_master(RGWUser* user, obj_version *objv,
+                                 bufferlist& in_data, JSONParser *jp, req_info& info);
 
     // implements DoutPrefixProvider
     std::ostream& gen_prefix(std::ostream& out) const { return out << "RGWRadosStore "; }
     CephContext* get_cct() const override { return rados->ctx(); }
     unsigned get_subsys() const override { return ceph_subsys_rgw; }
 
-  private:
-    int forward_request_to_master(RGWUser* user, obj_version *objv,
-                                 bufferlist& in_data, JSONParser *jp, req_info& info);
-
 };
 
 } } // namespace rgw::sal
index d6a7aa74b982964f9058dffcdfb47d917b4f233c..bb665a27e36e58a249477f948d844285e8a2a0c8 100644 (file)
@@ -1889,7 +1889,7 @@ int RGWUser::execute_remove(RGWUserAdminOpState& op_state, std::string *err_msg,
 
     std::string prefix, delimiter;
     for (auto it = m.begin(); it != m.end(); ++it) {
-      ret = it->second->remove_bucket(true, prefix, delimiter, y);
+      ret = it->second->remove_bucket(true, prefix, delimiter, false, nullptr, y);
       if (ret < 0) {
         set_err_msg(err_msg, "unable to delete user data");
         return ret;