]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librgw: implement rgw_create + bugfixes
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 10 Nov 2015 20:51:04 +0000 (15:51 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Fri, 12 Feb 2016 17:05:54 +0000 (12:05 -0500)
Incremental work to support CREATE,OPEN,WRITE*,CLOSE transactions.
Adds a new tuple-based signature for RGWFileHandle::lookup_fh, and
fixes an inconsisten initialization of fhk and fh.fh_hk.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index ae413b41e7c5d1ba9946ee9c9f79f4a0b209f270..84bcfd59e6325aa35ab192cbdce371fa7d5f0ffe 100644 (file)
@@ -104,14 +104,43 @@ int rgw_statfs(struct rgw_fs *rgw_fs,
 }
 
 /*
-  generic create
+  generic create -- creates a regular file
 */
 int rgw_create(struct rgw_fs *rgw_fs,
               struct rgw_file_handle *parent_fh,
               const char *name, mode_t mode, struct stat *st,
               struct rgw_file_handle **fh)
 {
-  return -EINVAL;
+  /* XXX a CREATE operation can be a precursor to the canonical
+   * OPEN, WRITE*, CLOSE transaction which writes or overwrites
+   * an object in S3 */
+  RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+
+  RGWFileHandle* parent = get_rgwfh(parent_fh);
+  if ((! parent) ||
+      (parent->is_root()) ||
+      (parent->is_object())) {
+    /* bad parent */
+    return -EINVAL;
+  }
+
+  using std::get;
+
+  LookupFHResult fhr = fs->lookup_fh(parent, name);
+  RGWFileHandle* rgw_fh = get<0>(fhr);
+
+  if (! rgw_fh)
+    return -EINVAL;
+
+
+  /* mark if we created it */
+  if (get<1>(fhr) & RGWFileHandle::FLAG_CREATE)
+    rgw_fh->open_for_create();
+
+  struct rgw_file_handle *rfh = rgw_fh->get_fh();
+  *fh = rfh;
+
+  return 0;
 }
 
 /*
@@ -148,7 +177,8 @@ int rgw_mkdir(struct rgw_fs *rgw_fs,
   rc = librgw.get_fe()->execute_req(&req);
 
   /* XXX: atomicity */
-  RGWFileHandle* rgw_fh = fs->lookup_fh(parent, name);
+  LookupFHResult fhr = fs->lookup_fh(parent, name);
+  RGWFileHandle* rgw_fh = get<0>(fhr);
 
   struct rgw_file_handle *rfh = rgw_fh->get_fh();
   *fh = rfh;
@@ -250,9 +280,11 @@ int rgw_lookup(struct rgw_fs *rgw_fs,
       rgw_fh = parent->ref();
     } else {
       /* name lookup in root--for now) just get a handle */
-      rgw_fh = fs->lookup_fh(parent, path);
+      LookupFHResult fhr = fs->lookup_fh(parent, path);
+      rgw_fh = get<0>(fhr);
+
       if (! rgw_fh)
-       return -ENOENT;
+       return -ENOMEM;
     }
   } else {
     std::string object_name{path};
@@ -266,7 +298,9 @@ int rgw_lookup(struct rgw_fs *rgw_fs,
        (! (flags & RGW_LOOKUP_FLAG_CREATE)))
       return -ENOENT;
 
-    rgw_fh = fs->lookup_fh(parent, path);
+    LookupFHResult fhr = fs->lookup_fh(parent, path);
+    rgw_fh = get<0>(fhr);
+
     if (! rgw_fh)
       return -ENOENT;
   } /* !root */
@@ -318,9 +352,6 @@ int rgw_getattr(struct rgw_fs *rgw_fs,
   CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
 
-  if (!fh)
-    return -ENOENT;
-
   RGWFileHandle* rgw_fh = get_rgwfh(fh);
 
   if (rgw_fh->is_root()) {
index 435dfb295079ec894732441a84712883aa3bc67a..a11f4d6f88d35665b00574b7d858155870461075 100644 (file)
@@ -109,15 +109,16 @@ namespace rgw {
   public:
     const static string root_name;
 
-    static constexpr uint32_t FLAG_NONE = 0x0000;
-    static constexpr uint32_t FLAG_OPEN = 0x0001;
-    static constexpr uint32_t FLAG_ROOT = 0x0002;
+    static constexpr uint32_t FLAG_NONE =   0x0000;
+    static constexpr uint32_t FLAG_OPEN =   0x0001;
+    static constexpr uint32_t FLAG_ROOT =   0x0002;
+    static constexpr uint32_t FLAG_CREATE = 0x0004;
 
     friend class RGWLibFS;
 
   private:
     RGWFileHandle(RGWLibFS* _fs)
-      : fs(_fs), parent(nullptr), flags(FLAG_ROOT)
+      : refcnt(1), fs(_fs), parent(nullptr), flags(FLAG_ROOT)
       {
        /* root */
        fh.fh_type = RGW_FS_TYPE_DIRECTORY;
@@ -130,6 +131,8 @@ namespace rgw {
       fh.fh_hk.bucket = XXH64(fsid.c_str(), fsid.length(), fh_key::seed);
       fh.fh_hk.object = XXH64(object_name.c_str(), object_name.length(),
                              fh_key::seed);
+      fhk = fh.fh_hk;
+      name = object_name;
     }
 
   public:
@@ -169,6 +172,7 @@ namespace rgw {
     bool is_root() const { return flags & FLAG_ROOT; }
     bool is_bucket() const { return (fh.fh_type == RGW_FS_TYPE_DIRECTORY); }
     bool is_object() const { return (fh.fh_type == RGW_FS_TYPE_FILE); }
+    bool creating() const { return flags & FLAG_CREATE; }
 
     void open() {
       flags |= FLAG_OPEN;
@@ -178,6 +182,10 @@ namespace rgw {
       flags &= ~FLAG_OPEN;
     }
 
+    void open_for_create() {
+      flags |= FLAG_CREATE;
+    }
+
     friend void intrusive_ptr_add_ref(const RGWFileHandle* fh) {
       fh->refcnt.fetch_add(1, std::memory_order_relaxed);
     }
@@ -251,6 +259,8 @@ namespace rgw {
     return static_cast<RGWFileHandle*>(fh->fh_private);
   }
 
+  typedef std::tuple<RGWFileHandle*, uint32_t> LookupFHResult;
+
   class RGWLibFS
   {
     CephContext* cct;
@@ -299,10 +309,13 @@ namespace rgw {
     }
 
     /* find or create an RGWFileHandle */
-    RGWFileHandle* lookup_fh(RGWFileHandle* parent, const char *name) {
+    LookupFHResult lookup_fh(RGWFileHandle* parent, const char *name) {
 
       RGWFileHandle::FHCache::Latch lat;
       fh_key fhk(parent->fhk.fh_hk.object, name);
+      LookupFHResult fhr { nullptr, RGWFileHandle::FLAG_NONE };
+
+      using std::get;
 
       RGWFileHandle* fh =
        fh_cache.find_latch(fhk.fh_hk.object /* partition selector*/,
@@ -314,10 +327,14 @@ namespace rgw {
        intrusive_ptr_add_ref(fh); /* sentinel ref */
        fh_cache.insert_latched(fh, lat,
                                RGWFileHandle::FHCache::FLAG_NONE);
+       get<1>(fhr) = RGWFileHandle::FLAG_CREATE;
       }
       intrusive_ptr_add_ref(fh); /* call path/handle ref */
       lat.lock->unlock(); /* !LATCHED */
-      return fh;
+
+      get<0>(fhr) = fh;
+
+      return fhr;
     }
 
     /* find or create an RGWFileHandle */