]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
DBObjectMap: restructure map_header_in_use locking 2517/head
authorSamuel Just <sam.just@inktank.com>
Wed, 10 Sep 2014 23:50:37 +0000 (16:50 -0700)
committerSamuel Just <sam.just@inktank.com>
Fri, 12 Sep 2014 20:40:01 +0000 (13:40 -0700)
Tieing map_header_in_use to a _Header is a mistake since ownership of
the map_header_in_use can move (clone).  Instead, grab the
map_header_in_use entry at the top of each call chain and release at the
end.  To facilitate that, we introduce a MapHeaderLock object and
require that it be passed to the various map header manipulation
methods.

Fixes: #9326
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/os/DBObjectMap.cc
src/os/DBObjectMap.h

index 1b8fc2ece8dc7980259ee6a440e9bca089a587f6..c5a5b69c95b670bb1af14e925b736855280bb9ee 100644 (file)
@@ -319,10 +319,13 @@ int DBObjectMap::DBObjectMapIteratorImpl::init()
 ObjectMap::ObjectMapIterator DBObjectMap::get_iterator(
   const ghobject_t &oid)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return ObjectMapIterator(new EmptyIteratorImpl());
-  return _get_iterator(header);
+  DBObjectMapIterator iter = _get_iterator(header);
+  iter->hlock.swap(hl);
+  return iter;
 }
 
 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
@@ -507,7 +510,8 @@ int DBObjectMap::set_keys(const ghobject_t &oid,
                          const SequencerPosition *spos)
 {
   KeyValueDB::Transaction t = db->get_transaction();
-  Header header = lookup_create_map_header(oid, t);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_create_map_header(hl, oid, t);
   if (!header)
     return -EINVAL;
   if (check_spos(oid, header, spos))
@@ -523,7 +527,8 @@ int DBObjectMap::set_header(const ghobject_t &oid,
                            const SequencerPosition *spos)
 {
   KeyValueDB::Transaction t = db->get_transaction();
-  Header header = lookup_create_map_header(oid, t);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_create_map_header(hl, oid, t);
   if (!header)
     return -EINVAL;
   if (check_spos(oid, header, spos))
@@ -543,7 +548,8 @@ void DBObjectMap::_set_header(Header header, const bufferlist &bl,
 int DBObjectMap::get_header(const ghobject_t &oid,
                            bufferlist *bl)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header) {
     return 0;
   }
@@ -578,12 +584,13 @@ int DBObjectMap::clear(const ghobject_t &oid,
                       const SequencerPosition *spos)
 {
   KeyValueDB::Transaction t = db->get_transaction();
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   if (check_spos(oid, header, spos))
     return 0;
-  remove_map_header(oid, header, t);
+  remove_map_header(hl, oid, header, t);
   assert(header->num_children > 0);
   header->num_children--;
   int r = _clear(header, t);
@@ -698,7 +705,8 @@ int DBObjectMap::rm_keys(const ghobject_t &oid,
                         const set<string> &to_clear,
                         const SequencerPosition *spos)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   KeyValueDB::Transaction t = db->get_transaction();
@@ -762,7 +770,7 @@ int DBObjectMap::rm_keys(const ghobject_t &oid,
     parent->num_children--;
     _clear(parent, t);
     header->parent = 0;
-    set_map_header(oid, *header, t);
+    set_map_header(hl, oid, *header, t);
     t->rmkeys_by_prefix(complete_prefix(header));
   }
   return db->submit_transaction(t);
@@ -772,7 +780,8 @@ int DBObjectMap::clear_keys_header(const ghobject_t &oid,
                                   const SequencerPosition *spos)
 {
   KeyValueDB::Transaction t = db->get_transaction();
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   if (check_spos(oid, header, spos))
@@ -789,7 +798,7 @@ int DBObjectMap::clear_keys_header(const ghobject_t &oid,
     return iter->status();
 
   // remove current header
-  remove_map_header(oid, header, t);
+  remove_map_header(hl, oid, header, t);
   assert(header->num_children > 0);
   header->num_children--;
   int r = _clear(header, t);
@@ -798,7 +807,7 @@ int DBObjectMap::clear_keys_header(const ghobject_t &oid,
 
   // create new header
   Header newheader = generate_new_header(oid, Header());
-  set_map_header(oid, *newheader, t);
+  set_map_header(hl, oid, *newheader, t);
   if (!attrs.empty())
     t->set(xattr_prefix(newheader), attrs);
   return db->submit_transaction(t);
@@ -808,7 +817,8 @@ int DBObjectMap::get(const ghobject_t &oid,
                     bufferlist *_header,
                     map<string, bufferlist> *out)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   _get_header(header, _header);
@@ -824,7 +834,8 @@ int DBObjectMap::get(const ghobject_t &oid,
 int DBObjectMap::get_keys(const ghobject_t &oid,
                          set<string> *keys)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   ObjectMapIterator iter = get_iterator(oid);
@@ -862,7 +873,8 @@ int DBObjectMap::get_values(const ghobject_t &oid,
                            const set<string> &keys,
                            map<string, bufferlist> *out)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   return scan(header, keys, 0, out);
@@ -872,7 +884,8 @@ int DBObjectMap::check_keys(const ghobject_t &oid,
                            const set<string> &keys,
                            set<string> *out)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   return scan(header, keys, out, 0);
@@ -882,7 +895,8 @@ int DBObjectMap::get_xattrs(const ghobject_t &oid,
                            const set<string> &to_get,
                            map<string, bufferlist> *out)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   return db->get(xattr_prefix(header), to_get, out);
@@ -891,7 +905,8 @@ int DBObjectMap::get_xattrs(const ghobject_t &oid,
 int DBObjectMap::get_all_xattrs(const ghobject_t &oid,
                                set<string> *out)
 {
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
@@ -907,7 +922,8 @@ int DBObjectMap::set_xattrs(const ghobject_t &oid,
                            const SequencerPosition *spos)
 {
   KeyValueDB::Transaction t = db->get_transaction();
-  Header header = lookup_create_map_header(oid, t);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_create_map_header(hl, oid, t);
   if (!header)
     return -EINVAL;
   if (check_spos(oid, header, spos))
@@ -921,7 +937,8 @@ int DBObjectMap::remove_xattrs(const ghobject_t &oid,
                               const SequencerPosition *spos)
 {
   KeyValueDB::Transaction t = db->get_transaction();
-  Header header = lookup_map_header(oid);
+  MapHeaderLock hl(this, oid);
+  Header header = lookup_map_header(hl, oid);
   if (!header)
     return -ENOENT;
   if (check_spos(oid, header, spos))
@@ -937,11 +954,22 @@ int DBObjectMap::clone(const ghobject_t &oid,
   if (oid == target)
     return 0;
 
+  MapHeaderLock _l1(this, MIN(oid, target));
+  MapHeaderLock _l2(this, MAX(oid, target));
+  MapHeaderLock *lsource, *ltarget;
+  if (oid > target) {
+    lsource = &_l2;
+    ltarget= &_l1;
+  } else {
+    lsource = &_l1;
+    ltarget= &_l2;
+  }
+
   KeyValueDB::Transaction t = db->get_transaction();
   {
-    Header destination = lookup_map_header(target);
+    Header destination = lookup_map_header(*ltarget, target);
     if (destination) {
-      remove_map_header(target, destination, t);
+      remove_map_header(*ltarget, target, destination, t);
       if (check_spos(target, destination, spos))
        return 0;
       destination->num_children--;
@@ -949,7 +977,7 @@ int DBObjectMap::clone(const ghobject_t &oid,
     }
   }
 
-  Header parent = lookup_map_header(oid);
+  Header parent = lookup_map_header(*lsource, oid);
   if (!parent)
     return db->submit_transaction(t);
 
@@ -960,8 +988,8 @@ int DBObjectMap::clone(const ghobject_t &oid,
 
   parent->num_children = 2;
   set_header(parent, t);
-  set_map_header(oid, *source, t);
-  set_map_header(target, *destination, t);
+  set_map_header(*lsource, oid, *source, t);
+  set_map_header(*ltarget, target, *destination, t);
 
   map<string, bufferlist> to_set;
   KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(parent));
@@ -1086,12 +1114,13 @@ int DBObjectMap::sync(const ghobject_t *oid,
   write_state(t);
   if (oid) {
     assert(spos);
-    Header header = lookup_map_header(*oid);
+    MapHeaderLock hl(this, *oid);
+    Header header = lookup_map_header(hl, *oid);
     if (header) {
       dout(10) << "oid: " << *oid << " setting spos to "
               << *spos << dendl;
       header->spos = *spos;
-      set_map_header(*oid, *header, t);
+      set_map_header(hl, *oid, *header, t);
     }
   }
   return db->submit_transaction_sync(t);
@@ -1109,16 +1138,19 @@ int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
 }
 
 
-DBObjectMap::Header DBObjectMap::_lookup_map_header(const ghobject_t &oid)
+DBObjectMap::Header DBObjectMap::_lookup_map_header(
+  const MapHeaderLock &l,
+  const ghobject_t &oid)
 {
-  while (map_header_in_use.count(oid))
-    header_cond.Wait(header_lock);
+  assert(l.get_locked() == oid);
 
   _Header *header = new _Header();
   {
     Mutex::Locker l(cache_lock);
     if (caches.lookup(oid, header)) {
-      return Header(header, RemoveMapHeaderOnDelete(this, oid));
+      assert(!in_use.count(header->seq));
+      in_use.insert(header->seq);
+      return Header(header, RemoveOnDelete(this));
     }
   }
 
@@ -1131,7 +1163,7 @@ DBObjectMap::Header DBObjectMap::_lookup_map_header(const ghobject_t &oid)
     return Header();
   }
 
-  Header ret(header, RemoveMapHeaderOnDelete(this, oid));
+  Header ret(header, RemoveOnDelete(this));
   bufferlist::iterator iter = out.begin()->second.begin();
   ret->decode(iter);
   {
@@ -1139,6 +1171,8 @@ DBObjectMap::Header DBObjectMap::_lookup_map_header(const ghobject_t &oid)
     caches.add(oid, *ret);
   }
 
+  assert(!in_use.count(header->seq));
+  in_use.insert(header->seq);
   return ret;
 }
 
@@ -1192,14 +1226,15 @@ DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
 }
 
 DBObjectMap::Header DBObjectMap::lookup_create_map_header(
+  const MapHeaderLock &hl,
   const ghobject_t &oid,
   KeyValueDB::Transaction t)
 {
   Mutex::Locker l(header_lock);
-  Header header = _lookup_map_header(oid);
+  Header header = _lookup_map_header(hl, oid);
   if (!header) {
     header = _generate_new_header(oid, Header());
-    set_map_header(oid, *header, t);
+    set_map_header(hl, oid, *header, t);
   }
   return header;
 }
@@ -1224,10 +1259,13 @@ void DBObjectMap::set_header(Header header, KeyValueDB::Transaction t)
   t->set(sys_prefix(header), to_write);
 }
 
-void DBObjectMap::remove_map_header(const ghobject_t &oid,
-                                   Header header,
-                                   KeyValueDB::Transaction t)
+void DBObjectMap::remove_map_header(
+  const MapHeaderLock &l,
+  const ghobject_t &oid,
+  Header header,
+  KeyValueDB::Transaction t)
 {
+  assert(l.get_locked() == oid);
   dout(20) << "remove_map_header: removing " << header->seq
           << " oid " << oid << dendl;
   set<string> to_remove;
@@ -1239,9 +1277,12 @@ void DBObjectMap::remove_map_header(const ghobject_t &oid,
   }
 }
 
-void DBObjectMap::set_map_header(const ghobject_t &oid, _Header header,
-                                KeyValueDB::Transaction t)
+void DBObjectMap::set_map_header(
+  const MapHeaderLock &l,
+  const ghobject_t &oid, _Header header,
+  KeyValueDB::Transaction t)
 {
+  assert(l.get_locked() == oid);
   dout(20) << "set_map_header: setting " << header.seq
           << " oid " << oid << " parent seq "
           << header.parent << dendl;
index 410a3439fec9786d01cc676e5fb1f1ee4f561d8e..f20c6e8efc9a10403ff889efe6fa3ca2d5cff2a9 100644 (file)
@@ -17,6 +17,7 @@
 #include "common/Mutex.h"
 #include "common/Cond.h"
 #include "common/simple_cache.hpp"
+#include <boost/optional.hpp>
 
 /**
  * DBObjectMap: Implements ObjectMap in terms of KeyValueDB
@@ -69,6 +70,49 @@ public:
   set<uint64_t> in_use;
   set<ghobject_t> map_header_in_use;
 
+  /**
+   * Takes the map_header_in_use entry in constructor, releases in
+   * destructor
+   */
+  class MapHeaderLock {
+    DBObjectMap *db;
+    boost::optional<ghobject_t> locked;
+
+    MapHeaderLock(const MapHeaderLock &);
+    MapHeaderLock &operator=(const MapHeaderLock &);
+  public:
+    MapHeaderLock(DBObjectMap *db) : db(db) {}
+    MapHeaderLock(DBObjectMap *db, const ghobject_t &oid) : db(db), locked(oid) {
+      Mutex::Locker l(db->header_lock);
+      while (db->map_header_in_use.count(*locked))
+       db->map_header_cond.Wait(db->header_lock);
+      db->map_header_in_use.insert(*locked);
+    }
+
+    const ghobject_t &get_locked() const {
+      assert(locked);
+      return *locked;
+    }
+
+    void swap(MapHeaderLock &o) {
+      assert(db == o.db);
+
+      // centos6's boost optional doesn't seem to have swap :(
+      boost::optional<ghobject_t> _locked = o.locked;
+      o.locked = locked;
+      locked = _locked;
+    }
+
+    ~MapHeaderLock() {
+      if (locked) {
+       Mutex::Locker l(db->header_lock);
+       assert(db->map_header_in_use.count(*locked));
+       db->map_header_cond.Signal();
+       db->map_header_in_use.erase(*locked);
+      }
+    }
+  };
+
   DBObjectMap(KeyValueDB *db) : db(db), header_lock("DBOBjectMap"),
                                 cache_lock("DBObjectMap::CacheLock"),
                                 caches(g_conf->filestore_omap_header_cache_size)
@@ -316,6 +360,8 @@ private:
   public:
     DBObjectMap *map;
 
+    /// NOTE: implicit lock hlock->get_locked() when returned out of the class
+    MapHeaderLock hlock;
     /// NOTE: implicit lock on header->seq AND for all ancestors
     Header header;
 
@@ -334,7 +380,7 @@ private:
     bool invalid;
 
     DBObjectMapIteratorImpl(DBObjectMap *map, Header header) :
-      map(map), header(header), r(0), ready(false), invalid(true) {}
+      map(map), hlock(map), header(header), r(0), ready(false), invalid(true) {}
     int seek_to_first();
     int seek_to_last();
     int upper_bound(const string &after);
@@ -378,13 +424,17 @@ private:
   void set_header(Header input, KeyValueDB::Transaction t);
 
   /// Remove leaf node corresponding to oid in c
-  void remove_map_header(const ghobject_t &oid,
-                        Header header,
-                        KeyValueDB::Transaction t);
+  void remove_map_header(
+    const MapHeaderLock &l,
+    const ghobject_t &oid,
+    Header header,
+    KeyValueDB::Transaction t);
 
   /// Set leaf node for c and oid to the value of header
-  void set_map_header(const ghobject_t &oid, _Header header,
-                     KeyValueDB::Transaction t);
+  void set_map_header(
+    const MapHeaderLock &l,
+    const ghobject_t &oid, _Header header,
+    KeyValueDB::Transaction t);
 
   /// Set leaf node for c and oid to the value of header
   bool check_spos(const ghobject_t &oid,
@@ -392,8 +442,10 @@ private:
                  const SequencerPosition *spos);
 
   /// Lookup or create header for c oid
-  Header lookup_create_map_header(const ghobject_t &oid,
-                                 KeyValueDB::Transaction t);
+  Header lookup_create_map_header(
+    const MapHeaderLock &l,
+    const ghobject_t &oid,
+    KeyValueDB::Transaction t);
 
   /**
    * Generate new header for c oid with new seq number
@@ -407,10 +459,14 @@ private:
   }
 
   /// Lookup leaf header for c oid
-  Header _lookup_map_header(const ghobject_t &oid);
-  Header lookup_map_header(const ghobject_t &oid) {
+  Header _lookup_map_header(
+    const MapHeaderLock &l,
+    const ghobject_t &oid);
+  Header lookup_map_header(
+    const MapHeaderLock &l2,
+    const ghobject_t &oid) {
     Mutex::Locker l(header_lock);
-    return _lookup_map_header(oid);
+    return _lookup_map_header(l2, oid);
   }
 
   /// Lookup header node for input
@@ -451,25 +507,8 @@ private:
                   KeyValueDB::Transaction t);
 
   /** 
-   * Removes map header lock once Header is out of scope
-   * @see lookup_map_header
-   */
-  class RemoveMapHeaderOnDelete {
-  public:
-    DBObjectMap *db;
-    ghobject_t oid;
-    RemoveMapHeaderOnDelete(DBObjectMap *db, const ghobject_t &oid) :
-      db(db), oid(oid) {}
-    void operator() (_Header *header) {
-      Mutex::Locker l(db->header_lock);
-      db->map_header_in_use.erase(oid);
-      db->map_header_cond.Signal();
-      delete header;
-    }
-  };
-
-  /** 
-   * Removes header seq lock once Header is out of scope
+   * Removes header seq lock and possibly object lock
+   * once Header is out of scope
    * @see lookup_parent
    * @see generate_new_header
    */
@@ -480,6 +519,7 @@ private:
       db(db) {}
     void operator() (_Header *header) {
       Mutex::Locker l(db->header_lock);
+      assert(db->in_use.count(header->seq));
       db->in_use.erase(header->seq);
       db->header_cond.Signal();
       delete header;