]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: implement reliable has-children check (unlink dir)
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 14 Mar 2017 01:52:08 +0000 (21:52 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Thu, 16 Mar 2017 15:43:39 +0000 (11:43 -0400)
Bug report and discussion provided by
Gui Hecheng <guihecheng@cmss.chinamobile.com> in nfs-ganesha upstream
github.  Briefly, while a reliable check is potentially costly,
it is necessary.

Fixes: http://tracker.ceph.com/issues/19270
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index f1d1ef1649d4985bbf56289b31e2ec333695f69b..3bde20bad4daea6fe2393b1a91f515ba83bcd543 100644 (file)
@@ -257,8 +257,14 @@ namespace rgw {
       }
 
       std::string oname = rgw_fh->relative_object_name();
-      if (rgw_fh->is_dir())
+      if (rgw_fh->is_dir()) {
+       /* for the duration of our cache timer, trust positive
+        * child cache */
+       if (rgw_fh->has_children()) {
+         return(-ENOTEMPTY);
+       }
        oname += "/";
+      }
       RGWDeleteObjRequest req(cct, get_user(), parent->bucket_name(),
                              oname);
       rc = rgwlib.get_fe()->execute_req(&req);
@@ -845,6 +851,28 @@ namespace rgw {
     return true;
   } /* RGWFileHandle::reclaim */
 
+  bool RGWFileHandle::has_children() const
+  {
+    if (unlikely(! is_dir()))
+      return false;
+
+#if 0
+    /* XXX pretty, but unsafe w/o consistent caching */
+    const directory* d = get<directory>(&variant_type);
+    if ((d->flags & RGWFileHandle::directory::FLAG_CACHED) &&
+       (state.nlink > 2 /* . and .. */))
+      return true;
+#endif /* 0 */
+
+    RGWRMdirCheck req(fs->get_context(), fs->get_user(), this);
+    int rc = rgwlib.get_fe()->execute_req(&req);
+    if (! rc) {
+      return req.valid && req.has_children;
+    }
+
+    return false;
+  }
+
   int RGWFileHandle::readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset,
                             bool *eof, uint32_t flags)
   {
@@ -868,7 +896,7 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1);
+       set_nlink(2 + 1); // XXXX
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
@@ -880,7 +908,7 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1);
+       set_nlink(2 + 1); // XXXX
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
@@ -1039,6 +1067,7 @@ namespace rgw {
 
   void RGWFileHandle::directory::clear_state()
   {
+    flags &= ~RGWFileHandle::directory::FLAG_CACHED;
     marker_cache.clear();
   }
 
index 8c590c1ec929b7b2221c20dc70c250e9d54f6a38..15f25573c5870f6c574a054d720e449ea698df4b 100644 (file)
@@ -398,11 +398,11 @@ namespace rgw {
 
     const std::string& object_name() const { return name; }
 
-    std::string full_object_name(bool omit_bucket = false) {
+    std::string full_object_name(bool omit_bucket = false) const {
       std::string path;
       std::vector<const std::string*> segments;
       int reserve = 0;
-      RGWFileHandle* tfh = this;
+      const RGWFileHandle* tfh = this;
       while (tfh && !tfh->is_root() && !(tfh->is_bucket() && omit_bucket)) {
        segments.push_back(&tfh->object_name());
        reserve += (1 + tfh->object_name().length());
@@ -423,11 +423,11 @@ namespace rgw {
       return path;
     }
 
-    inline std::string relative_object_name() {
+    inline std::string relative_object_name() const {
       return full_object_name(true /* omit_bucket */);
     }
 
-    inline std::string format_child_name(const std::string& cbasename) {
+    inline std::string format_child_name(const std::string& cbasename) const {
       std::string child_name{relative_object_name()};
       if ((child_name.size() > 0) &&
          (child_name.back() != '/'))
@@ -436,7 +436,7 @@ namespace rgw {
       return child_name;
     }
 
-    inline std::string make_key_name(const char *name) {
+    inline std::string make_key_name(const char *name) const {
       std::string key_name{full_object_name()};
       if (key_name.length() > 0)
        key_name += "/";
@@ -444,7 +444,7 @@ namespace rgw {
       return key_name;
     }
 
-    fh_key make_fhk(const std::string& name) {
+    fh_key make_fhk(const std::string& name) const {
       if (depth <= 1)
        return fh_key(fhk.fh_hk.object, name.c_str());
       else {
@@ -464,9 +464,9 @@ namespace rgw {
       }
     }
 
-    const rgw_obj_key* find_marker(uint64_t off) {
+    const rgw_obj_key* find_marker(uint64_t off) const {
       using std::get;
-      directory* d = get<directory>(&variant_type);
+      const directory* d = get<directory>(&variant_type);
       if (d) {
        const auto& iter = d->marker_cache.find(off);
        if (iter != d->marker_cache.end())
@@ -484,6 +484,7 @@ namespace rgw {
     bool creating() const { return flags & FLAG_CREATING; }
     bool deleted() const { return flags & FLAG_DELETED; }
     bool stateless_open() const { return flags & FLAG_STATELESS_OPEN; }
+    bool has_children() const;
 
     uint32_t open(uint32_t gsh_flags) {
       lock_guard guard(mtx);
@@ -1372,6 +1373,89 @@ public:
 
 }; /* RGWReaddirRequest */
 
+/*
+  dir has-children predicate (bucket objects)
+*/
+
+class RGWRMdirCheck : public RGWLibRequest,
+                     public RGWListBucket /* RGWOp */
+{
+public:
+  const RGWFileHandle* rgw_fh;
+  bool valid;
+  bool has_children;
+
+  RGWRMdirCheck (CephContext* _cct, RGWUserInfo *_user,
+                const RGWFileHandle* _rgw_fh)
+    : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), valid(false),
+      has_children(false) {
+    default_max = 2;
+    op = this;
+  }
+
+  virtual bool only_bucket() override { return false; }
+
+  virtual int op_init() override {
+    // assign store, s, and dialect_handler
+    RGWObjectCtx* rados_ctx
+      = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
+    // framework promises to call op_init after parent init
+    assert(rados_ctx);
+    RGWOp::init(rados_ctx->store, get_state(), this);
+    op = this; // assign self as op: REQUIRED
+    return 0;
+  }
+
+  virtual int header_init() override {
+    struct req_state* s = get_state();
+    s->info.method = "GET";
+    s->op = OP_GET;
+
+    std::string uri = "/" + rgw_fh->bucket_name() + "/";
+    s->relative_uri = uri;
+    s->info.request_uri = uri;
+    s->info.effective_uri = uri;
+    s->info.request_params = "";
+    s->info.domain = ""; /* XXX ? */
+
+    s->user = user;
+
+    prefix = rgw_fh->relative_object_name();
+    if (prefix.length() > 0)
+      prefix += "/";
+    delimiter = '/';
+
+    return 0;
+  }
+
+  virtual int get_params() override {
+    max = default_max;
+    return 0;
+  }
+
+  virtual void send_response() override {
+    valid = true;
+    if ((objs.size() > 1) ||
+       (! objs.empty() &&
+        (objs.front().key.name != prefix))) {
+      has_children = true;
+      return;
+    }
+    for (auto& iter : common_prefixes) {
+      /* readdir never produces a name for this case */
+      if (iter.first == "/")
+       continue;
+      has_children = true;
+      break;
+    }
+  }
+
+  virtual void send_versioned_response() {
+    send_response();
+  }
+
+}; /* RGWRMdirCheck */
+
 /*
   create bucket
 */