]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
FileStore: Index caching is introduced for performance improvement
authorSomnath Roy <somnath.roy@sandisk.com>
Mon, 30 Jun 2014 08:28:07 +0000 (01:28 -0700)
committerSomnath Roy <somnath.roy@sandisk.com>
Thu, 14 Aug 2014 22:28:30 +0000 (15:28 -0700)
IndexManager now has a Index caching. Index will only be created if not
found in the cache. Earlier, each op is creating an Index object and other
ops requesting the same index needed to wait till previous op is done.
Also, after finishing lookup, this Index object was destroyed.
Now, a Index cache is been implemented to persists these Indexes since
there is a major performance hit because each op is creating and destroying
these. A RWlock is been introduced in the CollectionIndex class and that is
responsible for sync between lookup and create.
Also, since these Index objects are persistent there is no need to use
smart pointers. So, Index is a wrapper class of CollecIndex* now.
It is the responsibility of the users of Index now to lock explicitely
before using them. Index object is sufficient now for locking and no need
to hold IndexPath for locking. The function interfaces of lfn_open,lfn_find
are changed accordingly.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
13 files changed:
src/os/CollectionIndex.h
src/os/FileStore.cc
src/os/FileStore.h
src/os/FlatIndex.cc
src/os/FlatIndex.h
src/os/HashIndex.cc
src/os/HashIndex.h
src/os/IndexManager.cc
src/os/IndexManager.h
src/os/LFNIndex.cc
src/os/LFNIndex.h
src/test/os/TestFlatIndex.cc
src/test/os/TestLFNIndex.cc

index 529db015bf039938cd26b2824bc8212310ef7cbf..d24d257325db2fb0c1a102b3b46771d5753f4a31 100644 (file)
@@ -21,6 +21,7 @@
 
 #include "osd/osd_types.h"
 #include "include/object.h"
+#include "common/RWLock.h"
 
 /**
  * CollectionIndex provides an interface for manipulating indexed collections
@@ -43,14 +44,14 @@ protected:
     /// Returned path
     string full_path;
     /// Ref to parent Index
-    ceph::shared_ptr<CollectionIndex> parent_ref;
+    CollectionIndex* parent_ref;
     /// coll_t for parent Index
     coll_t parent_coll;
 
     /// Normal Constructor
     Path(
       string path,                              ///< [in] Path to return.
-      ceph::weak_ptr<CollectionIndex> ref)  ///< [in] weak_ptr to parent.
+      CollectionIndex* ref)  
       : full_path(path), parent_ref(ref), parent_coll(parent_ref->coll()) {}
 
     /// Debugging Constructor
@@ -66,11 +67,13 @@ protected:
     coll_t coll() const { return parent_coll; }
 
     /// Getter for parent
-    ceph::shared_ptr<CollectionIndex> get_index() const {
+    CollectionIndex* get_index() const {
       return parent_ref;
     }
   };
  public:
+
+  RWLock access_lock;
   /// Type of returned paths
   typedef ceph::shared_ptr<Path> IndexedPath;
 
@@ -94,12 +97,6 @@ protected:
    */
   virtual coll_t coll() const = 0;
 
-  /** 
-   * For setting the internal weak_ptr to a shared_ptr to this.
-   *
-   * @see IndexManager
-   */
-  virtual void set_ref(ceph::shared_ptr<CollectionIndex> ref) = 0;
 
   /** 
    * Initializes the index.
@@ -161,7 +158,7 @@ protected:
   virtual int split(
     uint32_t match,                             //< [in] value to match
     uint32_t bits,                              //< [in] bits to check
-    ceph::shared_ptr<CollectionIndex> dest  //< [in] destination index
+    CollectionIndex* dest  //< [in] destination index
     ) { assert(0); return 0; }
 
 
@@ -183,6 +180,8 @@ protected:
   /// Call prior to removing directory
   virtual int prep_delete() { return 0; }
 
+  CollectionIndex():access_lock("CollectionIndex::access_lock"){}
+
   /// Virtual destructor
   virtual ~CollectionIndex() {}
 };
index 436f1e772ae1b52f31390250986ba89a0d9a217e..9e27244f90d3fe0faed2b8969e04ee1aab8bad4e 100644 (file)
@@ -147,9 +147,7 @@ int FileStore::get_cdir(coll_t cid, char *s, int len)
 
 int FileStore::get_index(coll_t cid, Index *index)
 {
-  char path[PATH_MAX];
-  get_cdir(cid, path, sizeof(path));
-  int r = index_manager.get_index(cid, path, index);
+  int r = index_manager.get_index(cid, basedir, index);
   assert(!m_filestore_fail_eio || r != -EIO);
   return r;
 }
@@ -163,15 +161,14 @@ int FileStore::init_index(coll_t cid)
   return r;
 }
 
-int FileStore::lfn_find(coll_t cid, const ghobject_t& oid, IndexedPath *path)
+int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
 {
-  Index index; 
+  IndexedPath path2;
+  if (!path)
+    path = &path2;
   int r, exist;
-  r = get_index(cid, &index);
-  if (r < 0)
-    return r;
-
-  r = index->lookup(oid, path, &exist);
+  assert(NULL != index.index);
+  r = (index.index)->lookup(oid, path, &exist);
   if (r < 0) {
     assert(!m_filestore_fail_eio || r != -EIO);
     return r;
@@ -183,9 +180,8 @@ int FileStore::lfn_find(coll_t cid, const ghobject_t& oid, IndexedPath *path)
 
 int FileStore::lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length)
 {
-  IndexedPath path;
   FDRef fd;
-  int r = lfn_open(cid, oid, false, &fd, &path);
+  int r = lfn_open(cid, oid, false, &fd);
   if (r < 0)
     return r;
   r = ::ftruncate(**fd, length);
@@ -202,7 +198,15 @@ int FileStore::lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length)
 int FileStore::lfn_stat(coll_t cid, const ghobject_t& oid, struct stat *buf)
 {
   IndexedPath path;
-  int r = lfn_find(cid, oid, &path);
+  Index index;
+  int r = get_index(cid, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
+  r = lfn_find(oid, index, &path);
   if (r < 0)
     return r;
   r = ::stat(path->path(), buf);
@@ -215,23 +219,27 @@ int FileStore::lfn_open(coll_t cid,
                        const ghobject_t& oid,
                        bool create,
                        FDRef *outfd,
-                       IndexedPath *path,
-                       Index *index) 
+                        Index *index)
 {
   assert(get_allow_sharded_objects() ||
         ( oid.shard_id == shard_id_t::NO_SHARD &&
           oid.generation == ghobject_t::NO_GEN ));
   assert(outfd);
+
+  bool need_lock = true;
   int flags = O_RDWR;
   if (create)
     flags |= O_CREAT;
+
   Index index2;
   if (!index) {
     index = &index2;
   }
   int r = 0;
-  if (!(*index)) {
+  if (!((*index).index)) {
     r = get_index(cid, index);
+  } else {
+    need_lock = false;
   }
 
   int fd, exist;
@@ -241,48 +249,50 @@ int FileStore::lfn_open(coll_t cid,
       return 0;
   }
 
-  {
-    IndexedPath path2;
-    if (!path)
-      path = &path2;
-    if (r < 0) {
-      derr << "error getting collection index for " << cid
-          << ": " << cpp_strerror(-r) << dendl;
-      goto fail;
-    }
-    r = (*index)->lookup(oid, path, &exist);
+  
+  assert(NULL != (*index).index);
+  if (need_lock) {
+    ((*index).index)->access_lock.get_write();
+  }
+
+  IndexedPath path2;
+  IndexedPath *path = &path2;
+  if (r < 0) {
+    derr << "error getting collection index for " << cid
+      << ": " << cpp_strerror(-r) << dendl;
+    goto fail;
+  }
+  r = (*index)->lookup(oid, path, &exist);
+  if (r < 0) {
+    derr << "could not find " << oid << " in index: "
+      << cpp_strerror(-r) << dendl;
+    goto fail;
+  }
+
+  r = ::open((*path)->path(), flags, 0644);
+  if (r < 0) {
+    r = -errno;
+    dout(10) << "error opening file " << (*path)->path() << " with flags="
+      << flags << ": " << cpp_strerror(-r) << dendl;
+    goto fail;
+  }
+  fd = r;
+  if (create && (!exist)) {
+    r = (*index)->created(oid, (*path)->path());
     if (r < 0) {
-      derr << "could not find " << oid << " in index: "
-          << cpp_strerror(-r) << dendl;
+      VOID_TEMP_FAILURE_RETRY(::close(fd));
+      derr << "error creating " << oid << " (" << (*path)->path()
+          << ") in index: " << cpp_strerror(-r) << dendl;
       goto fail;
     }
-
-    r = ::open((*path)->path(), flags, 0644);
+    r = chain_fsetxattr(fd, XATTR_SPILL_OUT_NAME,
+                        XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
     if (r < 0) {
-      r = -errno;
-      dout(10) << "error opening file " << (*path)->path() << " with flags="
-              << flags << ": " << cpp_strerror(-r) << dendl;
+      VOID_TEMP_FAILURE_RETRY(::close(fd));
+      derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
+                     << "):" << cpp_strerror(-r) << dendl;
       goto fail;
     }
-    fd = r;
-
-    if (create && (!exist)) {
-      r = (*index)->created(oid, (*path)->path());
-      if (r < 0) {
-       VOID_TEMP_FAILURE_RETRY(::close(fd));
-       derr << "error creating " << oid << " (" << (*path)->path()
-            << ") in index: " << cpp_strerror(-r) << dendl;
-       goto fail;
-      }
-      r = chain_fsetxattr(fd, XATTR_SPILL_OUT_NAME,
-                          XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
-      if (r < 0) {
-        VOID_TEMP_FAILURE_RETRY(::close(fd));
-        derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
-                       << "):" << cpp_strerror(-r) << dendl;
-        goto fail;
-      }
-    }
   }
 
   if (!replaying) {
@@ -290,14 +300,23 @@ int FileStore::lfn_open(coll_t cid,
     *outfd = fdcache.add(oid, fd, &existed);
     if (existed) {
       TEMP_FAILURE_RETRY(::close(fd));
-      return 0;
     }
   } else {
     *outfd = FDRef(new FDCache::FD(fd));
   }
+
+  if (need_lock) {
+    ((*index).index)->access_lock.put_write();
+  }
+
   return 0;
 
  fail:
+
+  if (need_lock) {
+    ((*index).index)->access_lock.put_write();
+  }
+
   assert(!m_filestore_fail_eio || r != -EIO);
   return r;
 }
@@ -312,6 +331,7 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghob
   IndexedPath path_new, path_old;
   int exist;
   int r;
+  bool index_same = false;
   if (c < newcid) {
     r = get_index(newcid, &index_new);
     if (r < 0)
@@ -324,6 +344,7 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghob
     if (r < 0)
       return r;
     index_new = index_old;
+    index_same = true;
   } else {
     r = get_index(c, &index_old);
     if (r < 0)
@@ -333,33 +354,73 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghob
       return r;
   }
 
-  r = index_old->lookup(o, &path_old, &exist);
-  if (r < 0) {
-    assert(!m_filestore_fail_eio || r != -EIO);
-    return r;
-  }
-  if (!exist)
-    return -ENOENT;
+  assert(NULL != index_old.index);
+  assert(NULL != index_new.index);
 
-  r = index_new->lookup(newoid, &path_new, &exist);
-  if (r < 0) {
-    assert(!m_filestore_fail_eio || r != -EIO);
-    return r;
-  }
-  if (exist)
-    return -EEXIST;
+  if (!index_same) {
 
-  dout(25) << "lfn_link path_old: " << path_old << dendl;
-  dout(25) << "lfn_link path_new: " << path_new << dendl;
-  r = ::link(path_old->path(), path_new->path());
-  if (r < 0)
-    return -errno;
+    RWLock::RLocker l1((index_old.index)->access_lock);
 
-  r = index_new->created(newoid, path_new->path());
-  if (r < 0) {
-    assert(!m_filestore_fail_eio || r != -EIO);
-    return r;
-  }
+    r = index_old->lookup(o, &path_old, &exist);
+    if (r < 0) {
+      assert(!m_filestore_fail_eio || r != -EIO);
+      return r;
+    }
+    if (!exist)
+      return -ENOENT;
+  
+    RWLock::WLocker l2((index_new.index)->access_lock);
+
+    r = index_new->lookup(newoid, &path_new, &exist);
+    if (r < 0) {
+      assert(!m_filestore_fail_eio || r != -EIO);
+      return r;
+    }
+    if (exist)
+      return -EEXIST;
+
+    dout(25) << "lfn_link path_old: " << path_old << dendl;
+    dout(25) << "lfn_link path_new: " << path_new << dendl;
+    r = ::link(path_old->path(), path_new->path());
+    if (r < 0)
+      return -errno;
+
+    r = index_new->created(newoid, path_new->path());
+    if (r < 0) {
+      assert(!m_filestore_fail_eio || r != -EIO);
+      return r;
+    }
+  } else {
+    RWLock::WLocker l1((index_old.index)->access_lock);
+
+    r = index_old->lookup(o, &path_old, &exist);
+    if (r < 0) {
+      assert(!m_filestore_fail_eio || r != -EIO);
+      return r;
+    }
+    if (!exist)
+      return -ENOENT;
+
+    r = index_new->lookup(newoid, &path_new, &exist);
+    if (r < 0) {
+      assert(!m_filestore_fail_eio || r != -EIO);
+      return r;
+    }
+    if (exist)
+      return -EEXIST;
+
+    dout(25) << "lfn_link path_old: " << path_old << dendl;
+    dout(25) << "lfn_link path_new: " << path_new << dendl;
+    r = ::link(path_old->path(), path_new->path());
+    if (r < 0)
+      return -errno;
+
+    r = index_new->created(newoid, path_new->path());
+    if (r < 0) {
+      assert(!m_filestore_fail_eio || r != -EIO);
+      return r;
+    }
+  }    
   return 0;
 }
 
@@ -371,6 +432,10 @@ int FileStore::lfn_unlink(coll_t cid, const ghobject_t& o,
   int r = get_index(cid, &index);
   if (r < 0)
     return r;
+
+  assert(NULL != index.index);
+  RWLock::WLocker l((index.index)->access_lock);
+
   {
     IndexedPath path;
     int exist;
@@ -1441,6 +1506,9 @@ int FileStore::mount()
             << " with error: " << ret << dendl;
        goto close_current_fd;
       }
+      assert(NULL != index.index);
+      RWLock::WLocker l((index.index)->access_lock);
+
       index->cleanup();
     }
   }
@@ -2880,12 +2948,14 @@ int FileStore::_clone(coll_t cid, const ghobject_t& oldoid, const ghobject_t& ne
   FDRef o, n;
   {
     Index index;
-    IndexedPath from, to;
-    r = lfn_open(cid, oldoid, false, &o, &from, &index);
+    r = lfn_open(cid, oldoid, false, &o, &index);
     if (r < 0) {
       goto out2;
     }
-    r = lfn_open(cid, newoid, true, &n, &to, &index);
+    assert(NULL != (index.index));
+    RWLock::WLocker l((index.index)->access_lock);
+
+    r = lfn_open(cid, newoid, true, &n, &index);
     if (r < 0) {
       goto out;
     }
@@ -3636,6 +3706,9 @@ int FileStore::getattr(coll_t cid, const ghobject_t& oid, const char *name, buff
       dout(10) << __func__ << " could not get index r = " << r << dendl;
       goto out;
     }
+    assert(NULL != index.index);
+    RWLock::RLocker l((index.index)->access_lock);
+
     r = object_map->get_xattrs(oid, to_get, &got);
     if (r < 0 && r != -ENOENT) {
       dout(10) << __func__ << " get_xattrs err r =" << r << dendl;
@@ -3695,19 +3768,24 @@ int FileStore::getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>
     dout(10) << __func__ << " could not get index r = " << r << dendl;
     goto out;
   }
-  r = object_map->get_all_xattrs(oid, &omap_attrs);
-  if (r < 0 && r != -ENOENT) {
-    dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
-    goto out;
-  }
+  {
+    assert(NULL != index.index);
+    RWLock::RLocker l((index.index)->access_lock);
 
-  r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
-  if (r < 0 && r != -ENOENT) {
-    dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
-    goto out;
+    r = object_map->get_all_xattrs(oid, &omap_attrs);
+    if (r < 0 && r != -ENOENT) {
+      dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
+      goto out;
+    }
+
+    r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
+    if (r < 0 && r != -ENOENT) {
+      dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
+      goto out;
+    }
+    if (r == -ENOENT)
+      r = 0;
   }
-  if (r == -ENOENT)
-    r = 0;
   assert(omap_attrs.size() == omap_aset.size());
   for (map<string, bufferlist>::iterator i = omap_aset.begin();
         i != omap_aset.end();
@@ -3857,6 +3935,9 @@ int FileStore::_rmattr(coll_t cid, const ghobject_t& oid, const char *name,
       dout(10) << __func__ << " could not get index r = " << r << dendl;
       goto out_close;
     }
+    assert(NULL != index.index);
+    RWLock::RLocker l((index.index)->access_lock);
+
     set<string> to_remove;
     to_remove.insert(string(name));
     r = object_map->remove_xattrs(oid, to_remove, &spos);
@@ -3916,22 +3997,26 @@ int FileStore::_rmattrs(coll_t cid, const ghobject_t& oid,
     dout(10) << __func__ << " could not get index r = " << r << dendl;
     goto out_close;
   }
-  r = object_map->get_all_xattrs(oid, &omap_attrs);
-  if (r < 0 && r != -ENOENT) {
-    dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
-    assert(!m_filestore_fail_eio || r != -EIO);
-    goto out_close;
-  }
-  r = object_map->remove_xattrs(oid, omap_attrs, &spos);
-  if (r < 0 && r != -ENOENT) {
-    dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
-    goto out_close;
-  }
-  if (r == -ENOENT)
-    r = 0;
+  {
+    assert(NULL != index.index);
+    RWLock::RLocker l((index.index)->access_lock);
 
-  chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
+    r = object_map->get_all_xattrs(oid, &omap_attrs);
+    if (r < 0 && r != -ENOENT) {
+      dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
+      assert(!m_filestore_fail_eio || r != -EIO);
+      goto out_close;
+    }
+    r = object_map->remove_xattrs(oid, omap_attrs, &spos);
+    if (r < 0 && r != -ENOENT) {
+      dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
+      goto out_close;
+    }
+    if (r == -ENOENT)
+      r = 0;
+    chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
                  sizeof(XATTR_NO_SPILL_OUT));
+  }
 
  out_close:
   lfn_close(fd);
@@ -4165,6 +4250,10 @@ int FileStore::collection_version_current(coll_t c, uint32_t *version)
   int r = get_index(c, &index);
   if (r < 0)
     return r;
+
+  assert(NULL != index.index);
+  RWLock::WLocker l((index.index)->access_lock);
+
   *version = index->collection_version();
   if (*version == target_version)
     return 1;
@@ -4259,6 +4348,10 @@ bool FileStore::collection_empty(coll_t c)
   int r = get_index(c, &index);
   if (r < 0)
     return false;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
   vector<ghobject_t> ls;
   collection_list_handle_t handle;
   r = index->collection_list_partial(ghobject_t(), 1, 1, 0, &ls, NULL);
@@ -4312,6 +4405,10 @@ int FileStore::collection_list_partial(coll_t c, ghobject_t start,
   int r = get_index(c, &index);
   if (r < 0)
     return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
   r = index->collection_list_partial(start,
                                     min, max, seq,
                                     ls, next);
@@ -4330,6 +4427,10 @@ int FileStore::collection_list(coll_t c, vector<ghobject_t>& ls)
   int r = get_index(c, &index);
   if (r < 0)
     return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
   r = index->collection_list(&ls);
   assert(!m_filestore_fail_eio || r != -EIO);
   return r;
@@ -4340,8 +4441,15 @@ int FileStore::omap_get(coll_t c, const ghobject_t &hoid,
                        map<string, bufferlist> *out)
 {
   dout(15) << __func__ << " " << c << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(c, hoid, &path);
+  Index index;
+  int r = get_index(c, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
   if (r < 0)
     return r;
   r = object_map->get(hoid, header, out);
@@ -4359,8 +4467,16 @@ int FileStore::omap_get_header(
   bool allow_eio)
 {
   dout(15) << __func__ << " " << c << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(c, hoid, &path);
+  Index index;
+  int r = get_index(c, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   r = object_map->get_header(hoid, bl);
@@ -4374,8 +4490,16 @@ int FileStore::omap_get_header(
 int FileStore::omap_get_keys(coll_t c, const ghobject_t &hoid, set<string> *keys)
 {
   dout(15) << __func__ << " " << c << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(c, hoid, &path);
+  Index index;
+  int r = get_index(c, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   r = object_map->get_keys(hoid, keys);
@@ -4391,8 +4515,16 @@ int FileStore::omap_get_values(coll_t c, const ghobject_t &hoid,
                               map<string, bufferlist> *out)
 {
   dout(15) << __func__ << " " << c << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(c, hoid, &path);
+  Index index;
+  int r = get_index(c, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   r = object_map->get_values(hoid, keys, out);
@@ -4408,8 +4540,17 @@ int FileStore::omap_check_keys(coll_t c, const ghobject_t &hoid,
                               set<string> *out)
 {
   dout(15) << __func__ << " " << c << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(c, hoid, &path);
+
+  Index index;
+  int r = get_index(c, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   r = object_map->check_keys(hoid, keys, out);
@@ -4424,8 +4565,16 @@ ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(coll_t c,
                                                          const ghobject_t &hoid)
 {
   dout(15) << __func__ << " " << c << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(c, hoid, &path);
+  Index index;
+  int r = get_index(c, &index);
+  if (r < 0)
+    return ObjectMap::ObjectMapIterator(); 
+
+  assert(NULL != index.index);
+  RWLock::RLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return ObjectMap::ObjectMapIterator();
   return object_map->get_iterator(hoid);
@@ -4477,6 +4626,9 @@ int FileStore::_destroy_collection(coll_t c)
     int r = get_index(c, &from);
     if (r < 0)
       return r;
+    assert(NULL != from.index);
+    RWLock::WLocker l((from.index)->access_lock);
+
     r = from->prep_delete();
     if (r < 0)
       return r;
@@ -4646,8 +4798,16 @@ void FileStore::_inject_failure()
 int FileStore::_omap_clear(coll_t cid, const ghobject_t &hoid,
                           const SequencerPosition &spos) {
   dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(cid, hoid, &path);
+  Index index;
+  int r = get_index(cid, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::WLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   r = object_map->clear_keys_header(hoid, &spos);
@@ -4660,8 +4820,16 @@ int FileStore::_omap_setkeys(coll_t cid, const ghobject_t &hoid,
                             const map<string, bufferlist> &aset,
                             const SequencerPosition &spos) {
   dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(cid, hoid, &path);
+  Index index;
+  int r = get_index(cid, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::WLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   return object_map->set_keys(hoid, aset, &spos);
@@ -4671,8 +4839,16 @@ int FileStore::_omap_rmkeys(coll_t cid, const ghobject_t &hoid,
                            const set<string> &keys,
                            const SequencerPosition &spos) {
   dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(cid, hoid, &path);
+  Index index;
+  int r = get_index(cid, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::WLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   r = object_map->rm_keys(hoid, keys, &spos);
@@ -4703,8 +4879,16 @@ int FileStore::_omap_setheader(coll_t cid, const ghobject_t &hoid,
                               const SequencerPosition &spos)
 {
   dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
-  IndexedPath path;
-  int r = lfn_find(cid, hoid, &path);
+  Index index;
+  int r = get_index(cid, &index);
+  if (r < 0)
+    return r;
+
+  assert(NULL != index.index);
+  RWLock::WLocker l((index.index)->access_lock);
+
+  r = lfn_find(hoid, index);
+
   if (r < 0)
     return r;
   return object_map->set_header(hoid, bl, &spos);
@@ -4751,8 +4935,15 @@ int FileStore::_split_collection(coll_t cid,
     if (!r)
       r = get_index(dest, &to);
 
-    if (!r)
-      r = from->split(rem, bits, to);
+    if (!r) {
+      assert(NULL != from.index);
+      RWLock::WLocker l1((from.index)->access_lock);
+
+      assert(NULL != to.index);
+      RWLock::WLocker l2((to.index)->access_lock);
+      
+      r = from->split(rem, bits, to.index);
+    }
 
     _close_replay_guard(cid, spos);
     _close_replay_guard(dest, spos);
@@ -4832,8 +5023,15 @@ int FileStore::_split_collection_create(coll_t cid,
   if (!r) 
     r = get_index(dest, &to);
 
-  if (!r) 
-    r = from->split(rem, bits, to);
+  if (!r) {
+    assert(NULL != from.index);
+    RWLock::WLocker l1((from.index)->access_lock);
+
+    assert(NULL != to.index);
+    RWLock::WLocker l2((to.index)->access_lock);
+    r = from->split(rem, bits, to.index);
+  }
 
   _close_replay_guard(cid, spos);
   _close_replay_guard(dest, spos);
index 34c46c55b5d45dc65cb7ae3904b7a1a9b631fc62..fcb4796d3b973195f673b83648c3704b99d1eeeb 100644 (file)
@@ -382,7 +382,8 @@ private:
   PerfCounters *logger;
 
 public:
-  int lfn_find(coll_t cid, const ghobject_t& oid, IndexedPath *path);
+  int lfn_find(const ghobject_t& oid, const Index& index, 
+                                  IndexedPath *path = NULL);
   int lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length);
   int lfn_stat(coll_t cid, const ghobject_t& oid, struct stat *buf);
   int lfn_open(
@@ -390,8 +391,8 @@ public:
     const ghobject_t& oid,
     bool create,
     FDRef *outfd,
-    IndexedPath *path = 0,
     Index *index = 0);
+
   void lfn_close(FDRef fd);
   int lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghobject_t& newoid) ;
   int lfn_unlink(coll_t cid, const ghobject_t& o, const SequencerPosition &spos,
index 2f1a1d16c6fcc1acfa8c6970616dab3cd217e187..6dcb52ec884581e4e596ae7368283e0f99122efa 100644 (file)
@@ -18,7 +18,6 @@
 #endif
 
 #include "FlatIndex.h"
-#include "CollectionIndex.h"
 #include "common/ceph_crypto.h"
 #include "osd/osd_types.h"
 #include <errno.h>
@@ -49,9 +48,6 @@ using ceph::crypto::SHA1;
 
 #define FILENAME_PREFIX_LEN (FILENAME_SHORT_LEN - FILENAME_HASH_LEN - (sizeof(FILENAME_COOKIE) - 1) - FILENAME_EXTRA)
 
-void FlatIndex::set_ref(ceph::shared_ptr<CollectionIndex> ref) {
-  self_ref = ref;
-}
 
 int FlatIndex::cleanup() {
   return 0;
@@ -356,7 +352,7 @@ int FlatIndex::lookup(const ghobject_t &hoid, IndexedPath *path, int *exist) {
              sizeof(long_fn), exist, &is_lfn);
   if (r < 0)
     return r;
-  *path = IndexedPath(new Path(string(short_fn), self_ref));
+  *path = IndexedPath(new Path(string(short_fn), this));
   return 0;
 }
 
index e55bd142c065e26b4b56c71918ee6fdda5262f24..ba6b5c2d236f32aeb49697094f5eb4d904ca375e 100644 (file)
@@ -29,7 +29,6 @@
  * This class should only be used for converting old filestores.
  */
 class FlatIndex : public CollectionIndex {
-  ceph::weak_ptr<CollectionIndex> self_ref;
   string base_path;
   coll_t collection;
 public:
@@ -41,9 +40,6 @@ public:
 
   coll_t coll() const { return collection; }
 
-  /// @see CollectionIndex
-  void set_ref(ceph::shared_ptr<CollectionIndex> ref);
-
   /// @see CollectionIndex
   int cleanup();
 
index 35cb49bbe0013c1407d06623211b8dbd007741c2..346ee0d6154486f5393565110dde39aedc435b87 100644 (file)
@@ -228,12 +228,12 @@ int HashIndex::col_split_level(
 int HashIndex::_split(
   uint32_t match,
   uint32_t bits,
-  ceph::shared_ptr<CollectionIndex> dest) {
+  CollectionIndex* dest) {
   assert(collection_version() == dest->collection_version());
   unsigned mkdirred = 0;
   return col_split_level(
     *this,
-    *static_cast<HashIndex*>(dest.get()),
+    *static_cast<HashIndex*>(dest),
     vector<string>(),
     bits,
     match,
index 4bf5c317d871a069e55143abc465714d15d29b33..68bc147248f4e21990bcb1c7bb41ac5c4bf0a15d 100644 (file)
@@ -156,7 +156,7 @@ public:
   int _split(
     uint32_t match,
     uint32_t bits,
-    ceph::shared_ptr<CollectionIndex> dest
+    CollectionIndex* dest
     );
        
 protected:
index e8a3785865f4fd0b6123000837ff0c28f1dfcbf7..bc935323e8087a017d2c551a6f25d6ae7b387484 100644 (file)
@@ -61,13 +61,18 @@ static int get_version(const char *path, uint32_t *version) {
   return 0;
 }
 
-void IndexManager::put_index(coll_t c) {
-  Mutex::Locker l(lock);
-  assert(col_indices.count(c));
-  col_indices.erase(c);
-  cond.Signal();
+IndexManager::~IndexManager() {
+
+  for(map<coll_t, CollectionIndex* > ::iterator it = col_indices.begin(); 
+                                it != col_indices.end(); it++) {
+
+    delete it->second;
+    it->second = NULL;
+  }
+  col_indices.clear();
 }
 
+
 int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
   Mutex::Locker l(lock);
   int r = set_version(path, version);
@@ -80,7 +85,7 @@ int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
   return index.init();
 }
 
-int IndexManager::build_index(coll_t c, const char *path, Index *index) {
+int IndexManager::build_index(coll_t c, const char *path, CollectionIndex **index) {
   if (upgrade) {
     // Need to check the collection generation
     int r;
@@ -91,17 +96,15 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) {
 
     switch (version) {
     case CollectionIndex::FLAT_INDEX_TAG: {
-      *index = Index(new FlatIndex(c, path),
-                    RemoveOnDelete(c, this));
+      *index = new FlatIndex(c, path);
       return 0;
     }
     case CollectionIndex::HASH_INDEX_TAG: // fall through
     case CollectionIndex::HASH_INDEX_TAG_2: // fall through
     case CollectionIndex::HOBJECT_WITH_POOL: {
       // Must be a HashIndex
-      *index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold,
-                                  g_conf->filestore_split_multiple, version), 
-                    RemoveOnDelete(c, this));
+      *index = new HashIndex(c, path, g_conf->filestore_merge_threshold,
+                                  g_conf->filestore_split_multiple, version);
       return 0;
     }
     default: assert(0);
@@ -109,28 +112,29 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) {
 
   } else {
     // No need to check
-    *index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold,
+    *index = new HashIndex(c, path, g_conf->filestore_merge_threshold,
                                 g_conf->filestore_split_multiple,
                                 CollectionIndex::HOBJECT_WITH_POOL,
-                                g_conf->filestore_index_retry_probability),
-                  RemoveOnDelete(c, this));
+                                g_conf->filestore_index_retry_probability);
     return 0;
   }
 }
 
-int IndexManager::get_index(coll_t c, const char *path, Index *index) {
+int IndexManager::get_index(coll_t c, const string& baseDir, Index *index) {
+
   Mutex::Locker l(lock);
-  while (1) {
-    if (!col_indices.count(c)) {
-      int r = build_index(c, path, index);
-      if (r < 0)
-       return r;
-      (*index)->set_ref(*index);
-      col_indices[c] = (*index);
-      break;
-    } else {
-      cond.Wait(lock);
-    }
+  map<coll_t, CollectionIndex* > ::iterator it = col_indices.find(c);
+  if (it == col_indices.end()) {
+    char path[PATH_MAX];
+    snprintf(path, sizeof(path), "%s/current/%s", baseDir.c_str(), c.to_str().c_str());
+    CollectionIndex* colIndex = NULL;
+    int r = build_index(c, path, &colIndex);
+    if (r < 0)
+      return r;
+    col_indices[c] = colIndex;
+    index->index = colIndex;
+  } else {
+    index->index = it->second;
   }
   return 0;
 }
index 2aa7c66a754f83a04194ecb8ffcb1093372490f8..7837fb76471fefeed3a87592e04118e98f8c8a2b 100644 (file)
 
 
 /// Public type for Index
-typedef ceph::shared_ptr<CollectionIndex> Index;
+struct Index {
+  CollectionIndex *index;
+
+  Index() : index(NULL) {}
+  Index(CollectionIndex* index) : index(index) {}
+
+  CollectionIndex *operator->() { return index; }
+  CollectionIndex &operator*() { return *index; }
+};
+
+
 /**
  * Encapsulates mutual exclusion for CollectionIndexes.
  *
@@ -37,39 +47,12 @@ typedef ceph::shared_ptr<CollectionIndex> Index;
  * that path) may result in the path becoming invalid.  Thus, during
  * the lifetime of a CollectionIndex object and any paths returned
  * by it, no other concurrent accesses may be allowed.
- *
- * This is enforced using shared_ptr.  A shared_ptr<CollectionIndex>
- * is returned from get_index.  Any paths generated using that object
- * carry a reference to the parrent index.  Once all
- * shared_ptr<CollectionIndex> references have expired, the destructor
- * removes the weak_ptr from col_indices and wakes waiters.
+ * This is enforced by using CollectionIndex::access_lock
  */
 class IndexManager {
   Mutex lock; ///< Lock for Index Manager
-  Cond cond;  ///< Cond for waiters on col_indices
   bool upgrade;
-
-  /// Currently in use CollectionIndices
-  map<coll_t,ceph::weak_ptr<CollectionIndex> > col_indices;
-
-  /// Cleans up state for c @see RemoveOnDelete
-  void put_index(
-    coll_t c ///< Put the index for c
-    );
-
-  /// Callback for shared_ptr release @see get_index
-  class RemoveOnDelete {
-  public:
-    coll_t c;
-    IndexManager *manager;
-    RemoveOnDelete(coll_t c, IndexManager *manager) : 
-      c(c), manager(manager) {}
-
-    void operator()(CollectionIndex *index) {
-      manager->put_index(c);
-      delete index;
-    }
-  };
+  map<coll_t, CollectionIndex* > col_indices;
 
   /**
    * Index factory
@@ -82,12 +65,14 @@ class IndexManager {
    * @param [out] index Index for c
    * @return error code
    */
-  int build_index(coll_t c, const char *path, Index *index);
+  int build_index(coll_t c, const char *path, CollectionIndex **index);
 public:
   /// Constructor
   IndexManager(bool upgrade) : lock("IndexManager lock"),
                               upgrade(upgrade) {}
 
+  ~IndexManager();
+
   /**
    * Reserve and return index for c
    *
@@ -96,7 +81,7 @@ public:
    * @param [out] index Index for c
    * @return error code
    */
-  int get_index(coll_t c, const char *path, Index *index);
+  int get_index(coll_t c, const string& baseDir, Index *index);
 
   /**
    * Initialize index for collection c at path
index 1e11a30903083948842259eb2823998ff3e5c70f..755faa4665beb340db57c40f334375ab12e87c79 100644 (file)
@@ -74,10 +74,6 @@ struct FDCloser {
 
 /* Public methods */
 
-void LFNIndex::set_ref(ceph::shared_ptr<CollectionIndex> ref)
-{
-  self_ref = ref;
-}
 
 int LFNIndex::init()
 {
@@ -142,7 +138,7 @@ int LFNIndex::lookup(const ghobject_t &oid,
   } else {
     *exist = 1;
   }
-  *out_path = IndexedPath(new Path(full_path, self_ref));
+  *out_path = IndexedPath(new Path(full_path, this));
   r = 0;
   );
 }
index 646e7267bb7e8e1572b8f8ef583d3c0ed62cb404..dad6c39d688f6c70a50da40454cec3cf3055aa1b 100644 (file)
@@ -98,8 +98,6 @@ class LFNIndex : public CollectionIndex {
 
   /// Path to Index base.
   const string base_path;
-  /// For reference counting the collection @see Path
-  ceph::weak_ptr<CollectionIndex> self_ref;
 
 protected:
   const uint32_t index_version;
@@ -155,9 +153,6 @@ public:
   /// Virtual destructor
   virtual ~LFNIndex() {}
 
-  /// @see CollectionIndex
-  void set_ref(ceph::shared_ptr<CollectionIndex> ref);
-
   /// @see CollectionIndex
   int init();
 
@@ -200,14 +195,14 @@ public:
   virtual int _split(
     uint32_t match,                             //< [in] value to match
     uint32_t bits,                              //< [in] bits to check
-    ceph::shared_ptr<CollectionIndex> dest  //< [in] destination index
+    CollectionIndex* dest                       //< [in] destination index
     ) = 0;
   
   /// @see CollectionIndex
   int split(
     uint32_t match,
     uint32_t bits,
-    ceph::shared_ptr<CollectionIndex> dest
+    CollectionIndex* dest
     ) {
     WRAP_RETRY(
       r = _split(match, bits, dest);
index 7bb2d42f225a9454f2607b49e7fb4b35680c5566..600120a09e167251282bbc1a734e5e083eea899b 100644 (file)
@@ -70,7 +70,6 @@ TEST(FlatIndex, created_unlink) {
   //
   {
     CollectionIndex::IndexedPath indexed_path;
-    index->set_ref(index);
     const std::string object_name(10, 'A');
     ghobject_t hoid(hobject_t(object_t(object_name), key, CEPH_NOSNAP, hash, pool, ""));
     int exists;
@@ -88,7 +87,6 @@ TEST(FlatIndex, created_unlink) {
   //
   {
     CollectionIndex::IndexedPath indexed_path;
-    index->set_ref(index);
     const std::string object_name(1024, 'A');
     ghobject_t hoid(hobject_t(object_t(object_name), key, CEPH_NOSNAP, hash, pool, ""));
     int exists;
index 47ccf640aefae41040ce3fef9fd61770255d5002..3e9f970ceb66dbd262bccee415805fe5c93b201c 100644 (file)
@@ -42,7 +42,7 @@ public:
   virtual int _split(
                     uint32_t match,                           
                     uint32_t bits,                            
-                    ceph::shared_ptr<CollectionIndex> dest
+                    CollectionIndex* dest
                     ) { return 0; }
 
   void test_generate_and_parse(const ghobject_t &hoid, const std::string &mangled_expected) {