]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librgw: fix rgw_unlink and delete atomicity
authorMatt Benjamin <mbenjamin@redhat.com>
Fri, 15 Jan 2016 17:45:58 +0000 (12:45 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Fri, 12 Feb 2016 17:08:05 +0000 (12:08 -0500)
Move internal unlink logic into new RGWLibFS::unlink method, fix
logic for files and directories.

Use RGWFileHandle::mtx and  RGWFileHandle::FLAG_DELETED to ensure
atomicity of handle creates and deletes.

Remove handles for unlinked objects from cache.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index 900590bec6061ac343de289e59acfe427381a163..d46d4c9813a575c1b47b2270e81db524b86d133f 100644 (file)
@@ -145,12 +145,73 @@ namespace rgw {
     return fhr;
   } /* RGWLibFS::stat_leaf */
 
+  int RGWLibFS::unlink(RGWFileHandle* parent, const char *name)
+  {
+    int rc = 0;
+
+    /* atomicity */
+    LookupFHResult fhr = lookup_fh(parent, name, RGWFileHandle::FLAG_LOCK);
+    RGWFileHandle* rgw_fh = get<0>(fhr);
+
+    if (parent->is_root()) {
+      /* XXXX remove uri and deal with bucket and object names */
+      string uri = "/";
+      uri += name;
+      RGWDeleteBucketRequest req(cct, get_user(), uri);
+      rc = rgwlib.get_fe()->execute_req(&req);
+      if (! rc) {
+       rc = req.get_ret();
+      }
+    } else {
+      /*
+       * leaf object
+       */
+      if (! rgw_fh) {
+       /* XXX for now, peform a hard lookup to deduce the type of
+        * object to be deleted ("foo" vs. "foo/")--also, ensures
+        * atomicity at this endpoint */
+       struct rgw_file_handle *fh;
+       rc = rgw_lookup(get_fs(), parent->get_fh(), name, &fh,
+                       RGW_LOOKUP_FLAG_NONE);
+       if (!! rc)
+         return rc;
+
+       /* rgw_fh ref+ */
+       rgw_fh = get_rgwfh(fh);
+       rgw_fh->mtx.lock();
+      }
+
+      std::string oname = rgw_fh->relative_object_name();
+      if (rgw_fh->is_dir())
+       oname += "/";
+      RGWDeleteObjRequest req(cct, get_user(), parent->bucket_name(),
+                             oname);
+      rc = rgwlib.get_fe()->execute_req(&req);
+      if (! rc) {
+       rc = req.get_ret();
+      }
+    }
+
+    rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
+    fh_cache.remove(rgw_fh->fh.fh_hk.object, rgw_fh, cohort::lru::FLAG_NONE);
+    rgw_fh->mtx.unlock();
+    unref(rgw_fh);
+
+    return rc;
+  } /* RGWLibFS::unlink */
+
   int RGWLibFS::rename(RGWFileHandle* src_fh, RGWFileHandle* dst_fh,
-                     const char *src_name, const char *dst_name)
+                      const char *src_name, const char *dst_name)
 
   {
     /* XXX initial implementation: try-copy, and delete if copy
      * succeeds */
+    int rc = -EINVAL;
+
+    /* atomicity */
+    LookupFHResult fhr = lookup_fh(src_fh, src_name, RGWFileHandle::FLAG_LOCK);
+    RGWFileHandle* rgw_fh = get<0>(fhr);
+
     for (int ix : {0, 1}) {
       switch (ix) {
       case 0:
@@ -160,7 +221,7 @@ namespace rgw {
        int rc = rgwlib.get_fe()->execute_req(&req);
        if ((rc != 0) ||
            ((rc = req.get_ret()) != 0)) {
-         return rc;
+         goto out;
        }
       }
       break;
@@ -171,18 +232,22 @@ namespace rgw {
        int rc = rgwlib.get_fe()->execute_req(&req);
        if (! rc) {
          rc = req.get_ret();
-         return rc;
+         goto out;
        }
       }
       break;
       default:
        abort();
       } /* switch */
-
-      return -EINVAL;
+    } /* ix */
+  out:
+    if (rgw_fh) {
+      rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
+      fh_cache.remove(rgw_fh->fh.fh_hk.object, rgw_fh, cohort::lru::FLAG_NONE);
+      rgw_fh->mtx.unlock();
+      unref(rgw_fh);
     }
-
-    return EINVAL;
+    return rc;
   } /* RGWLibFS::rename */
 
   int RGWLibFS::getattr(RGWFileHandle* rgw_fh, struct stat* st)
@@ -856,7 +921,7 @@ int rgw_rename(struct rgw_fs *rgw_fs,
   RGWFileHandle* old_fh = get_rgwfh(olddir);
   RGWFileHandle* new_fh = get_rgwfh(newdir);
 
-  return -(fs->rename(old_fh, new_fh, old_name, new_name));
+  return fs->rename(old_fh, new_fh, old_name, new_name);
 }
 
 /*
@@ -865,47 +930,10 @@ int rgw_rename(struct rgw_fs *rgw_fs,
 int rgw_unlink(struct rgw_fs *rgw_fs, struct rgw_file_handle *parent_fh,
               const char *name, uint32_t flags)
 {
-  int rc = 0;
-
   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
-  CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
-
   RGWFileHandle* parent = get_rgwfh(parent_fh);
 
-  if (parent->is_root()) {
-    /* XXXX remove uri and deal with bucket and object names */
-    string uri = "/";
-    uri += name;
-    RGWDeleteBucketRequest req(cct, fs->get_user(), uri);
-    rc = rgwlib.get_fe()->execute_req(&req);
-    if (! rc) {
-      rc = req.get_ret();
-    }
-  } else {
-    /*
-     * leaf object
-     */
-    /* XXX we must peform a hard lookup to deduce the type of
-     * object to be deleted ("foo" vs. "foo/"), and as a side effect
-     * can do no further work if no object existed */
-    struct rgw_file_handle *fh;
-    rc = rgw_lookup(rgw_fs, parent_fh, name, &fh, RGW_LOOKUP_FLAG_NONE);
-    if (! rc)  {
-      /* rgw_fh ref+ */
-      RGWFileHandle* rgw_fh = get_rgwfh(fh);
-      std::string oname = rgw_fh->relative_object_name();
-      RGWDeleteObjRequest req(cct, fs->get_user(), parent->bucket_name(),
-                             oname);
-      rc = rgwlib.get_fe()->execute_req(&req);
-      if (! rc) {
-       rc = req.get_ret();
-      }
-      /* release */
-      (void) rgw_fh_rele(rgw_fs, fh, 0 /* flags */);
-    }
-  }
-
-  return -rc;
+  return fs->unlink(parent, name);
 }
 
 /*
index fb96e0e8ea2055938b5842775aee8580a788852e..79ab1c0cad9b5f4ffcc88bc20d7c6f69c96969bf 100644 (file)
@@ -12,6 +12,8 @@
 #include <stdint.h>
 
 #include <atomic>
+#include <chrono>
+#include <thread>
 #include <mutex>
 #include <vector>
 #include <deque>
@@ -205,6 +207,7 @@ namespace rgw {
     static constexpr uint32_t FLAG_DIRECTORY = 0x0010;
     static constexpr uint32_t FLAG_BUCKET = 0x0020;
     static constexpr uint32_t FLAG_LOCK =   0x0040;
+    static constexpr uint32_t FLAG_DELETED = 0x0080;
 
     friend class RGWLibFS;
 
@@ -677,13 +680,13 @@ namespace rgw {
 
       LookupFHResult fhr { nullptr, RGWFileHandle::FLAG_NONE };
 
+      /* mount is stale? */
       if (state.flags & FLAG_CLOSED)
        return fhr;
 
       RGWFileHandle::FHCache::Latch lat;
       memset(&lat, 0, sizeof(lat)); // XXXX testing
 
-
       std::string obj_name{name};
       std::string key_name{parent->make_key_name(name)};
 
@@ -696,12 +699,23 @@ namespace rgw {
                            RGWFileHandle::FHCache::FLAG_LOCK);
       /* LATCHED */
       if (fh) {
+       fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
+       if (fh->flags & RGWFileHandle::FLAG_DELETED) {
+         /* for now, delay briefly and retry */
+         lat.lock->unlock();
+         fh->mtx.unlock();
+         std::this_thread::sleep_for(std::chrono::milliseconds(20));
+         goto retry; /* !LATCHED */
+       }
        /* need initial ref from LRU (fast path) */
        if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
          lat.lock->unlock();
+         fh->mtx.unlock();
          goto retry; /* !LATCHED */
        }
-       /* LATCHED */
+       /* LATCHED, LOCKED */
+       if (! (fh->flags & RGWFileHandle::FLAG_LOCK))
+         fh->mtx.unlock(); /* ! LOCKED */
       } else {
        /* make or re-use handle */
        RGWFileHandle::Factory prototype(this, get_inst(), parent, fhk,
@@ -713,6 +727,8 @@ namespace rgw {
        if (fh) {
          fh_cache.insert_latched(fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
          get<1>(fhr) |= RGWFileHandle::FLAG_CREATE;
+         if (fh->flags & RGWFileHandle::FLAG_LOCK)
+           fh->mtx.lock();
          goto out; /* !LATCHED */
        } else {
          lat.lock->unlock();
@@ -745,6 +761,8 @@ namespace rgw {
     int rename(RGWFileHandle* old_fh, RGWFileHandle* new_fh,
               const char *old_name, const char *new_name);
 
+    int unlink(RGWFileHandle* parent, const char *name);
+
     /* find existing RGWFileHandle */
     RGWFileHandle* lookup_handle(struct rgw_fh_hk fh_hk) {
 
@@ -768,8 +786,17 @@ namespace rgw {
          << dendl;
        goto out;
       }
+      fh->mtx.lock();
+      if (fh->flags & RGWFileHandle::FLAG_DELETED) {
+       /* for now, delay briefly and retry */
+       lat.lock->unlock();
+       fh->mtx.unlock();
+       std::this_thread::sleep_for(std::chrono::milliseconds(20));
+       goto retry; /* !LATCHED */
+      }
       if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
        lat.lock->unlock();
+       fh->mtx.unlock();
        goto retry; /* !LATCHED */
       }
       /* LATCHED */