]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: implement new mkdir2 with atomicity
authorMatt Benjamin <mbenjamin@redhat.com>
Mon, 11 Apr 2016 23:33:14 +0000 (19:33 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 12 Apr 2016 20:11:00 +0000 (16:11 -0400)
Re-order lookup_fh, FLAG_CREATE and remote create operations,
and provide FLAG_LOCK, to get atomicity.

Other ops which need to linearize at lookup_fh still need to be
converted.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index fec2e86bcd343af6d6ac7e823cec0d63e5786834..16f4b0efee10c1970dc1480bf86fae454f482802 100644 (file)
@@ -338,6 +338,84 @@ namespace rgw {
     return mkr;
   } /* RGWLibFS::mkdir */
 
+  MkObjResult RGWLibFS::mkdir2(RGWFileHandle* parent, const char *name,
+                              struct stat *st, uint32_t mask, uint32_t flags)
+  {
+    MkObjResult mkr{nullptr, -EINVAL};
+    int rc, rc2;
+
+    LookupFHResult fhr;
+    RGWFileHandle* rgw_fh = nullptr;
+
+    fhr = lookup_fh(parent, name,
+                   RGWFileHandle::FLAG_CREATE|
+                   RGWFileHandle::FLAG_DIRECTORY|
+                   RGWFileHandle::FLAG_LOCK);
+    rgw_fh = get<0>(fhr);
+    if (rgw_fh) {
+      /* XXX unify timestamps */
+      rgw_fh->create_stat(st, mask);
+      rgw_fh->set_times(real_clock::now());
+      rgw_fh->stat(st);
+      get<0>(mkr) = rgw_fh;
+    } else {
+      get<1>(mkr) = -EIO;
+      return mkr;
+    }
+
+    if (parent->is_root()) {
+      /* bucket */
+      string bname{name};
+      /* enforce S3 name restrictions */
+      rc = valid_s3_bucket_name(bname, false /* relaxed */);
+      if (rc != 0) {
+       rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
+       rgw_fh->mtx.unlock();
+       unref(rgw_fh);
+       get<0>(mkr) = nullptr;
+       return mkr;
+      }
+
+      string uri = "/" + bname; /* XXX get rid of URI some day soon */
+      RGWCreateBucketRequest req(get_context(), get_user(), uri);
+      rc = rgwlib.get_fe()->execute_req(&req);
+      rc2 = req.get_ret();
+    } else {
+      /* create an object representing the directory */
+      buffer::list bl;
+      string dir_name = /* XXX get rid of this some day soon, too */
+       parent->relative_object_name();
+      /* creating objects w/leading '/' makes a mess */
+      if ((dir_name.size() > 0) &&
+         (dir_name.back() != '/'))
+       dir_name += "/";
+      dir_name += name;
+      dir_name += "/";
+      RGWPutObjRequest req(get_context(), get_user(), parent->bucket_name(),
+                         dir_name, bl);
+      rc = rgwlib.get_fe()->execute_req(&req);
+      rc2 = req.get_ret();
+    }
+
+    if (! ((rc == 0) &&
+          (rc2 == 0))) {
+      /* op failed */
+      rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
+      rgw_fh->mtx.unlock();
+      unref(rgw_fh);
+      get<0>(mkr) = nullptr;
+      /* fixup rc */
+      if (!rc)
+       rc = rc2;
+    }
+
+  out:
+    rgw_fh->mtx.unlock(); /* !LOCKED */
+    get<1>(mkr) = rc;
+
+    return mkr;
+  } /* RGWLibFS::mkdir2 */
+
   MkObjResult RGWLibFS::create(RGWFileHandle* parent, const char *name,
                              struct stat *st, uint32_t mask, uint32_t flags)
   {
@@ -908,7 +986,7 @@ int rgw_mkdir(struct rgw_fs *rgw_fs,
     return -EINVAL;
   }
 
-  MkObjResult fhr = fs->mkdir(parent, name, st, mask, flags);
+  MkObjResult fhr = fs->mkdir2(parent, name, st, mask, flags);
   RGWFileHandle *nfh = get<0>(fhr); // nullptr if !success
 
   if (nfh)
index aba96d622c653b5c9cacc0e05be95e293510e9ae..2acd77d8fe904f7bae2c37ecf13551763bc0c051 100644 (file)
@@ -830,10 +830,12 @@ namespace rgw {
                        cohort::lru::Edge::MRU,
                        cohort::lru::FLAG_INITIAL));
        if (fh) {
-         fh_cache.insert_latched(fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
-         get<1>(fhr) |= RGWFileHandle::FLAG_CREATE;
+         /* lock fh (LATCHED) */
          if (fh->flags & RGWFileHandle::FLAG_LOCK)
            fh->mtx.lock();
+         /* inserts, releasing latch */
+         fh_cache.insert_latched(fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
+         get<1>(fhr) |= RGWFileHandle::FLAG_CREATE;
          goto out; /* !LATCHED */
        } else {
          lat.lock->unlock();
@@ -874,6 +876,8 @@ namespace rgw {
 
     MkObjResult mkdir(RGWFileHandle* parent, const char *name, struct stat *st,
                      uint32_t mask, uint32_t flags);
+    MkObjResult mkdir2(RGWFileHandle* parent, const char *name, struct stat *st,
+                     uint32_t mask, uint32_t flags);
 
     int unlink(RGWFileHandle* parent, const char *name);