]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore/BlueStore: use std::mutex et al
authorSage Weil <sage@redhat.com>
Wed, 27 Jan 2016 18:47:39 +0000 (13:47 -0500)
committerSage Weil <sage@redhat.com>
Wed, 27 Jan 2016 18:55:42 +0000 (13:55 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index 5b4fc409ffbe06df6d66f93b35c608f9778a3626..8beaa8496f6b912b33f2e0ec0dd3a9af78f9f654 100644 (file)
@@ -472,16 +472,15 @@ BlueStore::Onode::Onode(const ghobject_t& o, const string& k)
     oid(o),
     key(k),
     dirty(false),
-    exists(true),
-    flush_lock("BlueStore::Onode::flush_lock") {
+    exists(true) {
 }
 
 void BlueStore::Onode::flush()
 {
-  Mutex::Locker l(flush_lock);
+  std::unique_lock<std::mutex> l(flush_lock);
   dout(20) << __func__ << " " << flush_txns << dendl;
   while (!flush_txns.empty())
-    flush_cond.Wait(flush_lock);
+    flush_cond.wait(l);
   dout(20) << __func__ << " done" << dendl;
 }
 
@@ -499,7 +498,7 @@ void BlueStore::OnodeHashLRU::_touch(OnodeRef o)
 
 void BlueStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard<std::mutex> l(lock);
   dout(30) << __func__ << " " << oid << " " << o << dendl;
   assert(onode_map.count(oid) == 0);
   onode_map[oid] = o;
@@ -508,7 +507,7 @@ void BlueStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
 
 BlueStore::OnodeRef BlueStore::OnodeHashLRU::lookup(const ghobject_t& oid)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard<std::mutex> l(lock);
   dout(30) << __func__ << dendl;
   ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
   if (p == onode_map.end()) {
@@ -522,7 +521,7 @@ BlueStore::OnodeRef BlueStore::OnodeHashLRU::lookup(const ghobject_t& oid)
 
 void BlueStore::OnodeHashLRU::clear()
 {
-  Mutex::Locker l(lock);
+  std::lock_guard<std::mutex> l(lock);
   dout(10) << __func__ << dendl;
   lru.clear();
   onode_map.clear();
@@ -531,7 +530,7 @@ void BlueStore::OnodeHashLRU::clear()
 void BlueStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
                                    const ghobject_t& new_oid)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard<std::mutex> l(lock);
   dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
   ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
   po = onode_map.find(old_oid);
@@ -563,7 +562,7 @@ bool BlueStore::OnodeHashLRU::get_next(
   const ghobject_t& after,
   pair<ghobject_t,OnodeRef> *next)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard<std::mutex> l(lock);
   dout(20) << __func__ << " after " << after << dendl;
 
   if (after == ghobject_t()) {
@@ -591,7 +590,7 @@ bool BlueStore::OnodeHashLRU::get_next(
 
 int BlueStore::OnodeHashLRU::trim(int max)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard<std::mutex> l(lock);
   dout(20) << __func__ << " max " << max
           << " size " << onode_map.size() << dendl;
   int trimmed = 0;
@@ -756,7 +755,6 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
     fsid_fd(-1),
     mounted(false),
     coll_lock("BlueStore::coll_lock"),
-    nid_lock("BlueStore::nid_lock"),
     nid_max(0),
     throttle_ops(cct, "bluestore_max_ops", cct->_conf->bluestore_max_ops),
     throttle_bytes(cct, "bluestore_max_bytes", cct->_conf->bluestore_max_bytes),
@@ -766,7 +764,6 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
     throttle_wal_bytes(cct, "bluestore_wal_max_bytes",
                       cct->_conf->bluestore_max_bytes +
                       cct->_conf->bluestore_wal_max_bytes),
-    wal_lock("BlueStore::wal_lock"),
     wal_seq(0),
     wal_tp(cct,
           "BlueStore::wal_tp",
@@ -779,10 +776,8 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
             &wal_tp),
     finisher(cct),
     kv_sync_thread(this),
-    kv_lock("BlueStore::kv_lock"),
     kv_stop(false),
-    logger(NULL),
-    reap_lock("BlueStore::reap_lock")
+    logger(NULL)
 {
   _init_logger();
 }
@@ -2255,13 +2250,12 @@ void BlueStore::_sync()
   // flush aios in flght
   bdev->flush();
 
-  kv_lock.Lock();
+  std::unique_lock<std::mutex> l(kv_lock);
   while (!kv_committing.empty() ||
         !kv_queue.empty()) {
     dout(20) << " waiting for kv to commit" << dendl;
-    kv_sync_cond.Wait(kv_lock);
+    kv_sync_cond.wait(l);
   }
-  kv_lock.Unlock();
 
   dout(10) << __func__ << " done" << dendl;
 }
@@ -2293,17 +2287,17 @@ BlueStore::CollectionRef BlueStore::_get_collection(coll_t cid)
 void BlueStore::_queue_reap_collection(CollectionRef& c)
 {
   dout(10) << __func__ << " " << c->cid << dendl;
-  Mutex::Locker l(reap_lock);
+  std::lock_guard<std::mutex> l(reap_lock);
   removed_collections.push_back(c);
 }
 
 void BlueStore::_reap_collections()
 {
-  reap_lock.Lock();
-
   list<CollectionRef> removed_colls;
-  removed_colls.swap(removed_collections);
-  reap_lock.Unlock();
+  {
+    std::lock_guard<std::mutex> l(reap_lock);
+    removed_colls.swap(removed_collections);
+  }
 
   for (list<CollectionRef>::iterator p = removed_colls.begin();
        p != removed_colls.end();
@@ -2512,6 +2506,7 @@ int BlueStore::_do_read(
        if (r < 0) {
          goto out;
        }
+       r = r_len;
        bufferlist u;
        u.substr_of(t, front_extra, x_len);
        bl.claim_append(u);
@@ -3218,7 +3213,7 @@ void BlueStore::_assign_nid(TransContext *txc, OnodeRef o)
 {
   if (o->onode.nid)
     return;
-  Mutex::Locker l(nid_lock);
+  std::lock_guard<std::mutex> l(nid_lock);
   o->onode.nid = ++nid_last;
   dout(20) << __func__ << " " << o->onode.nid << dendl;
   if (nid_last > nid_max) {
@@ -3281,15 +3276,15 @@ void BlueStore::_txc_state_proc(TransContext *txc)
       return;
 
     case TransContext::STATE_IO_DONE:
-      assert(txc->osr->qlock.is_locked());  // see _txc_finish_io
+      //assert(txc->osr->qlock.is_locked());  // see _txc_finish_io
       txc->state = TransContext::STATE_KV_QUEUED;
       if (!g_conf->bluestore_sync_transaction) {
-       Mutex::Locker l(kv_lock);
+       std::lock_guard<std::mutex> l(kv_lock);
        if (g_conf->bluestore_sync_submit_transaction) {
          db->submit_transaction(txc->t);
        }
        kv_queue.push_back(txc);
-       kv_cond.SignalOne();
+       kv_cond.notify_one();
        return;
       }
       db->submit_transaction_sync(txc->t);
@@ -3352,7 +3347,7 @@ void BlueStore::_txc_finish_io(TransContext *txc)
    */
 
   OpSequencer *osr = txc->osr.get();
-  Mutex::Locker l(osr->qlock);
+  std::lock_guard<std::mutex> l(osr->qlock);
   txc->state = TransContext::STATE_IO_DONE;
 
   OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
@@ -3388,7 +3383,7 @@ int BlueStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
     dout(20) << " onode " << (*p)->oid << " is " << bl.length() << dendl;
     txc->t->set(PREFIX_OBJ, (*p)->key, bl);
 
-    Mutex::Locker l((*p)->flush_lock);
+    std::lock_guard<std::mutex> l((*p)->flush_lock);
     (*p)->flush_txns.insert(txc);
   }
 
@@ -3456,13 +3451,13 @@ void BlueStore::_txc_finish(TransContext *txc)
   for (set<OnodeRef>::iterator p = txc->onodes.begin();
        p != txc->onodes.end();
        ++p) {
-    Mutex::Locker l((*p)->flush_lock);
+    std::lock_guard<std::mutex> l((*p)->flush_lock);
     dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
             << dendl;
     assert((*p)->flush_txns.count(txc));
     (*p)->flush_txns.erase(txc);
     if ((*p)->flush_txns.empty())
-      (*p)->flush_cond.Signal();
+      (*p)->flush_cond.notify_all();
   }
 
   // clear out refs
@@ -3477,16 +3472,17 @@ void BlueStore::_txc_finish(TransContext *txc)
   throttle_wal_bytes.put(txc->bytes);
 
   OpSequencerRef osr = txc->osr;
-  osr->qlock.Lock();
-  txc->state = TransContext::STATE_DONE;
-  osr->qlock.Unlock();
+  {
+    std::lock_guard<std::mutex> l(osr->qlock);
+    txc->state = TransContext::STATE_DONE;
+  }
 
   _osr_reap_done(osr.get());
 }
 
 void BlueStore::_osr_reap_done(OpSequencer *osr)
 {
-  Mutex::Locker l(osr->qlock);
+  std::lock_guard<std::mutex> l(osr->qlock);
   dout(20) << __func__ << " osr " << osr << dendl;
   while (!osr->q.empty()) {
     TransContext *txc = &osr->q.front();
@@ -3502,7 +3498,7 @@ void BlueStore::_osr_reap_done(OpSequencer *osr)
 
     osr->q.pop_front();
     delete txc;
-    osr->qcond.Signal();
+    osr->qcond.notify_all();
     if (osr->q.empty())
       dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
   }
@@ -3511,7 +3507,7 @@ void BlueStore::_osr_reap_done(OpSequencer *osr)
 void BlueStore::_kv_sync_thread()
 {
   dout(10) << __func__ << " start" << dendl;
-  kv_lock.Lock();
+  std::unique_lock<std::mutex> l(kv_lock);
   while (true) {
     assert(kv_committing.empty());
     assert(wal_cleaning.empty());
@@ -3519,8 +3515,8 @@ void BlueStore::_kv_sync_thread()
       if (kv_stop)
        break;
       dout(20) << __func__ << " sleep" << dendl;
-      kv_sync_cond.Signal();
-      kv_cond.Wait(kv_lock);
+      kv_sync_cond.notify_all();
+      kv_cond.wait(l);
       dout(20) << __func__ << " wake" << dendl;
     } else {
       dout(20) << __func__ << " committing " << kv_queue.size()
@@ -3528,7 +3524,7 @@ void BlueStore::_kv_sync_thread()
       kv_committing.swap(kv_queue);
       wal_cleaning.swap(wal_cleanup_queue);
       utime_t start = ceph_clock_now(NULL);
-      kv_lock.Unlock();
+      l.unlock();
 
       dout(30) << __func__ << " committing txc " << kv_committing << dendl;
       dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;
@@ -3677,10 +3673,9 @@ void BlueStore::_kv_sync_thread()
        }
       }
 
-      kv_lock.Lock();
+      l.lock();
     }
   }
-  kv_lock.Unlock();
   dout(10) << __func__ << " finish" << dendl;
 }
 
@@ -3718,10 +3713,10 @@ int BlueStore::_wal_finish(TransContext *txc)
   bluestore_wal_transaction_t& wt = *txc->wal_txn;
   dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
 
-  Mutex::Locker l(kv_lock);
+  std::lock_guard<std::mutex> l(kv_lock);
   txc->state = TransContext::STATE_WAL_CLEANUP;
   wal_cleanup_queue.push_back(txc);
-  kv_cond.SignalOne();
+  kv_cond.notify_one();
   return 0;
 }
 
index 08d77cdc00f6ea898908a6615743edd11260eb07..d994d58f4e31db8ab320294f8609ea2433591f78 100644 (file)
@@ -19,6 +19,9 @@
 
 #include <unistd.h>
 
+#include <mutex>
+#include <condition_variable>
+
 #include <boost/intrusive/list.hpp>
 #include <boost/intrusive/unordered_set.hpp>
 #include <boost/functional/hash.hpp>
@@ -118,8 +121,8 @@ public:
     bool dirty;     // ???
     bool exists;
 
-    Mutex flush_lock;  ///< protect flush_txns
-    Cond flush_cond;   ///< wait here for unapplied txns
+    std::mutex flush_lock;  ///< protect flush_txns
+    std::condition_variable flush_cond;   ///< wait here for unapplied txns
     set<TransContext*> flush_txns;   ///< committing or wal txns
 
     uint64_t tail_offset;
@@ -151,11 +154,11 @@ public:
        boost::intrusive::list_member_hook<>,
        &Onode::lru_item> > lru_list_t;
 
-    Mutex lock;
+    std::mutex lock;
     ceph::unordered_map<ghobject_t,OnodeRef> onode_map;  ///< forward lookups
     lru_list_t lru;                                      ///< lru
 
-    OnodeHashLRU() : lock("BlueStore::OnodeHashLRU::lock") {}
+    OnodeHashLRU() {}
 
     void add(const ghobject_t& oid, OnodeRef o);
     void _touch(OnodeRef o);
@@ -307,8 +310,8 @@ public:
 
   class OpSequencer : public Sequencer_impl {
   public:
-    Mutex qlock;
-    Cond qcond;
+    std::mutex qlock;
+    std::condition_variable qcond;
     typedef boost::intrusive::list<
       TransContext,
       boost::intrusive::member_hook<
@@ -329,31 +332,31 @@ public:
 
     Sequencer *parent;
 
-    Mutex wal_apply_lock;
+    std::mutex wal_apply_mutex;
+    std::unique_lock<std::mutex> wal_apply_lock;
 
     OpSequencer()
        //set the qlock to to PTHREAD_MUTEX_RECURSIVE mode
-      : qlock("BlueStore::OpSequencer::qlock", true, false),
-       parent(NULL),
-       wal_apply_lock("BlueStore::OpSequencer::wal_apply_lock") {
+      : parent(NULL),
+       wal_apply_lock(wal_apply_mutex, std::defer_lock) {
     }
     ~OpSequencer() {
       assert(q.empty());
     }
 
     void queue_new(TransContext *txc) {
-      Mutex::Locker l(qlock);
+      std::lock_guard<std::mutex> l(qlock);
       q.push_back(*txc);
     }
 
     void flush() {
-      Mutex::Locker l(qlock);
+      std::unique_lock<std::mutex> l(qlock);
       while (!q.empty())
-       qcond.Wait(qlock);
+       qcond.wait(l);
     }
 
     bool flush_commit(Context *c) {
-      Mutex::Locker l(qlock);
+      std::lock_guard<std::mutex> l(qlock);
       if (q.empty()) {
        return true;
       }
@@ -418,12 +421,12 @@ public:
 
       // preserve wal ordering for this sequencer by taking the lock
       // while still holding the queue lock
-      i->osr->wal_apply_lock.Lock();
+      i->osr->wal_apply_lock.lock();
       return i;
     }
     void _process(TransContext *i, ThreadPool::TPHandle &handle) {
       store->_wal_apply(i);
-      i->osr->wal_apply_lock.Unlock();
+      i->osr->wal_apply_lock.unlock();
     }
     using ThreadPool::WorkQueue<TransContext>::_process;
     void _clear() {
@@ -468,7 +471,7 @@ private:
   RWLock coll_lock;    ///< rwlock to protect coll_map
   ceph::unordered_map<coll_t, CollectionRef> coll_map;
 
-  Mutex nid_lock;
+  std::mutex nid_lock;
   uint64_t nid_last;
   uint64_t nid_max;
 
@@ -477,7 +480,7 @@ private:
 
   interval_set<uint64_t> bluefs_extents;  ///< block extents owned by bluefs
 
-  Mutex wal_lock;
+  std::mutex wal_lock;
   atomic64_t wal_seq;
   ThreadPool wal_tp;
   WALWQ wal_wq;
@@ -485,15 +488,15 @@ private:
   Finisher finisher;
 
   KVSyncThread kv_sync_thread;
-  Mutex kv_lock;
-  Cond kv_cond, kv_sync_cond;
+  std::mutex kv_lock;
+  std::condition_variable kv_cond, kv_sync_cond;
   bool kv_stop;
   deque<TransContext*> kv_queue, kv_committing;
   deque<TransContext*> wal_cleanup_queue, wal_cleaning;
 
   Logger *logger;
 
-  Mutex reap_lock;
+  std::mutex reap_lock;
   list<CollectionRef> removed_collections;
 
 
@@ -564,9 +567,9 @@ private:
   void _kv_sync_thread();
   void _kv_stop() {
     {
-      Mutex::Locker l(kv_lock);
+      std::lock_guard<std::mutex> l(kv_lock);
       kv_stop = true;
-      kv_cond.Signal();
+      kv_cond.notify_all();
     }
     kv_sync_thread.join();
     kv_stop = false;