]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: remove hidden uxattr objects from buckets on delete 15210/head
authorMatt Benjamin <mbenjamin@redhat.com>
Mon, 22 May 2017 18:51:19 +0000 (14:51 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Mon, 22 May 2017 22:16:54 +0000 (18:16 -0400)
If a setattr (e.g., chown) has been performed on a bucket, then like
any directory it has a hidden object storing its attributes. This must
be deleted before attempting bucket delete, otherwise, actually empty
buckets will not be removable via NFS.

Fixes: http://tracker.ceph.com/issues/20045
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index f79dd0a78b75dc484ef93951f7369f7e73a0abfb..e756c484417f9e908f31413abdfe19be0a832ef7 100644 (file)
@@ -66,18 +66,20 @@ namespace rgw {
     return 0;
   }
 
-  LookupFHResult RGWLibFS::stat_bucket(RGWFileHandle* parent,
-                                      const char *path, uint32_t flags)
+  LookupFHResult RGWLibFS::stat_bucket(RGWFileHandle* parent, const char *path,
+                                      RGWLibFS::BucketStats& bs,
+                                      uint32_t flags)
   {
     LookupFHResult fhr{nullptr, 0};
     std::string bucket_name{path};
-    RGWStatBucketRequest req(cct, get_user(), bucket_name);
+    RGWStatBucketRequest req(cct, get_user(), bucket_name, bs);
 
     int rc = rgwlib.get_fe()->execute_req(&req);
     if ((rc == 0) &&
        (req.get_ret() == 0) &&
        (req.matched())) {
       fhr = lookup_fh(parent, path,
+                     (flags & RGWFileHandle::FLAG_LOCKED)|
                      RGWFileHandle::FLAG_CREATE|
                      RGWFileHandle::FLAG_BUCKET);
       if (get<0>(fhr)) {
@@ -252,7 +254,9 @@ namespace rgw {
   int RGWLibFS::unlink(RGWFileHandle* rgw_fh, const char* name, uint32_t flags)
   {
     int rc = 0;
+    BucketStats bs;
     RGWFileHandle* parent = nullptr;
+    RGWFileHandle* bkt_fh = nullptr;
 
     if (unlikely(flags & RGWFileHandle::FLAG_UNLINK_THIS)) {
       /* LOCKED */
@@ -266,6 +270,28 @@ namespace rgw {
     }
 
     if (parent->is_root()) {
+      /* a bucket may have an object storing Unix attributes, check
+       * for and delete it */
+      LookupFHResult fhr;
+      fhr = stat_bucket(parent, name, bs, RGWFileHandle::FLAG_LOCKED);
+      bkt_fh = get<0>(fhr);
+      if (unlikely(! bkt_fh)) {
+       /* implies !rgw_fh, so also !LOCKED */
+       return -ENOENT;
+      }
+
+      if (bs.num_entries > 1) {
+       unref(bkt_fh); /* return extra ref */
+       return -ENOTEMPTY;
+      } else {
+       /* delete object w/key "<bucket>/" (uxattrs), if any */
+       string oname{"/"};
+       RGWDeleteObjRequest req(cct, get_user(), bkt_fh->bucket_name(), oname);
+       rc = rgwlib.get_fe()->execute_req(&req);
+       /* don't care if ENOENT */
+       unref(bkt_fh);
+      }
+
       /* XXXX remove uri and deal with bucket and object names */
       string uri = "/";
       uri += name;
@@ -1559,7 +1585,8 @@ int rgw_lookup(struct rgw_fs *rgw_fs,
                 (strcmp(path, "/") == 0))) {
       rgw_fh = parent;
     } else {
-      fhr = fs->stat_bucket(parent, path, RGWFileHandle::FLAG_NONE);
+      RGWLibFS::BucketStats bstat;
+      fhr = fs->stat_bucket(parent, path, bstat, RGWFileHandle::FLAG_NONE);
       rgw_fh = get<0>(fhr);
       if (! rgw_fh)
        return -ENOENT;
index 1edc37a9f60ba630dcc7d731f58b799b80831897..eb3fcf2fc8a0e215d98f68423d2ba9c0330feda0 100644 (file)
@@ -809,6 +809,13 @@ namespace rgw {
     static constexpr uint32_t FLAG_NONE =      0x0000;
     static constexpr uint32_t FLAG_CLOSED =    0x0001;
 
+    struct BucketStats {
+      size_t size;
+      size_t size_rounded;
+      real_time creation_time;
+      uint64_t num_entries;
+    };
+
     RGWLibFS(CephContext* _cct, const char *_uid, const char *_user_id,
            const char* _key)
       : cct(_cct), root_fh(this, new_inst()), invalidate_cb(nullptr),
@@ -914,6 +921,7 @@ namespace rgw {
       LookupFHResult fhr { nullptr, uint32_t(RGWFileHandle::FLAG_NONE) };
 
       RGWFileHandle::FHCache::Latch lat;
+      bool fh_locked = flags & RGWFileHandle::FLAG_LOCKED;
 
     retry:
       RGWFileHandle* fh =
@@ -922,11 +930,13 @@ namespace rgw {
                            RGWFileHandle::FHCache::FLAG_LOCK);
       /* LATCHED */
       if (fh) {
-       fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
+       if (likely(! fh_locked))
+           fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
        /* need initial ref from LRU (fast path) */
        if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
          lat.lock->unlock();
-         fh->mtx.unlock();
+         if (likely(! fh_locked))
+           fh->mtx.unlock();
          goto retry; /* !LATCHED */
        }
        /* LATCHED, LOCKED */
@@ -958,6 +968,7 @@ namespace rgw {
        return fhr;
 
       RGWFileHandle::FHCache::Latch lat;
+      bool fh_locked = flags & RGWFileHandle::FLAG_LOCKED;
 
       std::string obj_name{name};
       std::string key_name{parent->make_key_name(name)};
@@ -977,23 +988,27 @@ namespace rgw {
                            RGWFileHandle::FHCache::FLAG_LOCK);
       /* LATCHED */
       if (fh) {
-       fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
+       if (likely(! fh_locked))
+         fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
        if (fh->flags & RGWFileHandle::FLAG_DELETED) {
          /* for now, delay briefly and retry */
          lat.lock->unlock();
-         fh->mtx.unlock();
+         if (likely(! fh_locked))
+           fh->mtx.unlock();
          std::this_thread::sleep_for(std::chrono::milliseconds(20));
          goto retry; /* !LATCHED */
        }
        /* need initial ref from LRU (fast path) */
        if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
          lat.lock->unlock();
-         fh->mtx.unlock();
+         if (likely(! fh_locked))
+           fh->mtx.unlock();
          goto retry; /* !LATCHED */
        }
        /* LATCHED, LOCKED */
        if (! (flags & RGWFileHandle::FLAG_LOCK))
-         fh->mtx.unlock(); /* ! LOCKED */
+         if (likely(! fh_locked))
+           fh->mtx.unlock(); /* ! LOCKED */
       } else {
        /* make or re-use handle */
        RGWFileHandle::Factory prototype(this, get_inst(), parent, fhk,
@@ -1048,8 +1063,9 @@ namespace rgw {
     int setattr(RGWFileHandle* rgw_fh, struct stat* st, uint32_t mask,
                uint32_t flags);
 
-    LookupFHResult stat_bucket(RGWFileHandle* parent,
-                              const char *path, uint32_t flags);
+    LookupFHResult stat_bucket(RGWFileHandle* parent, const char *path,
+                              RGWLibFS::BucketStats& bs,
+                              uint32_t flags);
 
     LookupFHResult stat_leaf(RGWFileHandle* parent, const char *path,
                             enum rgw_fh_type type = RGW_FS_TYPE_NIL,
@@ -1975,10 +1991,12 @@ class RGWStatBucketRequest : public RGWLibRequest,
 public:
   std::string uri;
   std::map<std::string, buffer::list> attrs;
+  RGWLibFS::BucketStats& bs;
 
   RGWStatBucketRequest(CephContext* _cct, RGWUserInfo *_user,
-                      const std::string& _path)
-    : RGWLibRequest(_cct, _user) {
+                      const std::string& _path,
+                      RGWLibFS::BucketStats& _stats)
+    : RGWLibRequest(_cct, _user), bs(_stats) {
     uri = "/" + _path;
     op = this;
   }
@@ -2030,6 +2048,10 @@ public:
 
   void send_response() override {
     bucket.creation_time = get_state()->bucket_info.creation_time;
+    bs.size = bucket.size;
+    bs.size_rounded = bucket.size_rounded;
+    bs.creation_time = bucket.creation_time;
+    bs.num_entries = bucket.count;
     std::swap(attrs, get_state()->bucket_attrs);
   }