From: Sage Weil Date: Wed, 27 Jan 2016 18:47:39 +0000 (-0500) Subject: os/bluestore/BlueStore: use std::mutex et al X-Git-Tag: v10.0.4~101^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=df53b1674f335d4292cdf1b22095852eb404758e;p=ceph.git os/bluestore/BlueStore: use std::mutex et al Signed-off-by: Sage Weil --- diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 5b4fc409ffb..8beaa8496f6 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -472,16 +472,15 @@ BlueStore::Onode::Onode(const ghobject_t& o, const string& k) oid(o), key(k), dirty(false), - exists(true), - flush_lock("BlueStore::Onode::flush_lock") { + exists(true) { } void BlueStore::Onode::flush() { - Mutex::Locker l(flush_lock); + std::unique_lock l(flush_lock); dout(20) << __func__ << " " << flush_txns << dendl; while (!flush_txns.empty()) - flush_cond.Wait(flush_lock); + flush_cond.wait(l); dout(20) << __func__ << " done" << dendl; } @@ -499,7 +498,7 @@ void BlueStore::OnodeHashLRU::_touch(OnodeRef o) void BlueStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o) { - Mutex::Locker l(lock); + std::lock_guard l(lock); dout(30) << __func__ << " " << oid << " " << o << dendl; assert(onode_map.count(oid) == 0); onode_map[oid] = o; @@ -508,7 +507,7 @@ void BlueStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o) BlueStore::OnodeRef BlueStore::OnodeHashLRU::lookup(const ghobject_t& oid) { - Mutex::Locker l(lock); + std::lock_guard l(lock); dout(30) << __func__ << dendl; ceph::unordered_map::iterator p = onode_map.find(oid); if (p == onode_map.end()) { @@ -522,7 +521,7 @@ BlueStore::OnodeRef BlueStore::OnodeHashLRU::lookup(const ghobject_t& oid) void BlueStore::OnodeHashLRU::clear() { - Mutex::Locker l(lock); + std::lock_guard l(lock); dout(10) << __func__ << dendl; lru.clear(); onode_map.clear(); @@ -531,7 +530,7 @@ void BlueStore::OnodeHashLRU::clear() void BlueStore::OnodeHashLRU::rename(const ghobject_t& old_oid, const ghobject_t& new_oid) { - Mutex::Locker l(lock); + std::lock_guard l(lock); dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl; ceph::unordered_map::iterator po, pn; po = onode_map.find(old_oid); @@ -563,7 +562,7 @@ bool BlueStore::OnodeHashLRU::get_next( const ghobject_t& after, pair *next) { - Mutex::Locker l(lock); + std::lock_guard l(lock); dout(20) << __func__ << " after " << after << dendl; if (after == ghobject_t()) { @@ -591,7 +590,7 @@ bool BlueStore::OnodeHashLRU::get_next( int BlueStore::OnodeHashLRU::trim(int max) { - Mutex::Locker l(lock); + std::lock_guard l(lock); dout(20) << __func__ << " max " << max << " size " << onode_map.size() << dendl; int trimmed = 0; @@ -756,7 +755,6 @@ BlueStore::BlueStore(CephContext *cct, const string& path) fsid_fd(-1), mounted(false), coll_lock("BlueStore::coll_lock"), - nid_lock("BlueStore::nid_lock"), nid_max(0), throttle_ops(cct, "bluestore_max_ops", cct->_conf->bluestore_max_ops), throttle_bytes(cct, "bluestore_max_bytes", cct->_conf->bluestore_max_bytes), @@ -766,7 +764,6 @@ BlueStore::BlueStore(CephContext *cct, const string& path) throttle_wal_bytes(cct, "bluestore_wal_max_bytes", cct->_conf->bluestore_max_bytes + cct->_conf->bluestore_wal_max_bytes), - wal_lock("BlueStore::wal_lock"), wal_seq(0), wal_tp(cct, "BlueStore::wal_tp", @@ -779,10 +776,8 @@ BlueStore::BlueStore(CephContext *cct, const string& path) &wal_tp), finisher(cct), kv_sync_thread(this), - kv_lock("BlueStore::kv_lock"), kv_stop(false), - logger(NULL), - reap_lock("BlueStore::reap_lock") + logger(NULL) { _init_logger(); } @@ -2255,13 +2250,12 @@ void BlueStore::_sync() // flush aios in flght bdev->flush(); - kv_lock.Lock(); + std::unique_lock l(kv_lock); while (!kv_committing.empty() || !kv_queue.empty()) { dout(20) << " waiting for kv to commit" << dendl; - kv_sync_cond.Wait(kv_lock); + kv_sync_cond.wait(l); } - kv_lock.Unlock(); dout(10) << __func__ << " done" << dendl; } @@ -2293,17 +2287,17 @@ BlueStore::CollectionRef BlueStore::_get_collection(coll_t cid) void BlueStore::_queue_reap_collection(CollectionRef& c) { dout(10) << __func__ << " " << c->cid << dendl; - Mutex::Locker l(reap_lock); + std::lock_guard l(reap_lock); removed_collections.push_back(c); } void BlueStore::_reap_collections() { - reap_lock.Lock(); - list removed_colls; - removed_colls.swap(removed_collections); - reap_lock.Unlock(); + { + std::lock_guard l(reap_lock); + removed_colls.swap(removed_collections); + } for (list::iterator p = removed_colls.begin(); p != removed_colls.end(); @@ -2512,6 +2506,7 @@ int BlueStore::_do_read( if (r < 0) { goto out; } + r = r_len; bufferlist u; u.substr_of(t, front_extra, x_len); bl.claim_append(u); @@ -3218,7 +3213,7 @@ void BlueStore::_assign_nid(TransContext *txc, OnodeRef o) { if (o->onode.nid) return; - Mutex::Locker l(nid_lock); + std::lock_guard l(nid_lock); o->onode.nid = ++nid_last; dout(20) << __func__ << " " << o->onode.nid << dendl; if (nid_last > nid_max) { @@ -3281,15 +3276,15 @@ void BlueStore::_txc_state_proc(TransContext *txc) return; case TransContext::STATE_IO_DONE: - assert(txc->osr->qlock.is_locked()); // see _txc_finish_io + //assert(txc->osr->qlock.is_locked()); // see _txc_finish_io txc->state = TransContext::STATE_KV_QUEUED; if (!g_conf->bluestore_sync_transaction) { - Mutex::Locker l(kv_lock); + std::lock_guard l(kv_lock); if (g_conf->bluestore_sync_submit_transaction) { db->submit_transaction(txc->t); } kv_queue.push_back(txc); - kv_cond.SignalOne(); + kv_cond.notify_one(); return; } db->submit_transaction_sync(txc->t); @@ -3352,7 +3347,7 @@ void BlueStore::_txc_finish_io(TransContext *txc) */ OpSequencer *osr = txc->osr.get(); - Mutex::Locker l(osr->qlock); + std::lock_guard l(osr->qlock); txc->state = TransContext::STATE_IO_DONE; OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc); @@ -3388,7 +3383,7 @@ int BlueStore::_txc_finalize(OpSequencer *osr, TransContext *txc) dout(20) << " onode " << (*p)->oid << " is " << bl.length() << dendl; txc->t->set(PREFIX_OBJ, (*p)->key, bl); - Mutex::Locker l((*p)->flush_lock); + std::lock_guard l((*p)->flush_lock); (*p)->flush_txns.insert(txc); } @@ -3456,13 +3451,13 @@ void BlueStore::_txc_finish(TransContext *txc) for (set::iterator p = txc->onodes.begin(); p != txc->onodes.end(); ++p) { - Mutex::Locker l((*p)->flush_lock); + std::lock_guard l((*p)->flush_lock); dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns << dendl; assert((*p)->flush_txns.count(txc)); (*p)->flush_txns.erase(txc); if ((*p)->flush_txns.empty()) - (*p)->flush_cond.Signal(); + (*p)->flush_cond.notify_all(); } // clear out refs @@ -3477,16 +3472,17 @@ void BlueStore::_txc_finish(TransContext *txc) throttle_wal_bytes.put(txc->bytes); OpSequencerRef osr = txc->osr; - osr->qlock.Lock(); - txc->state = TransContext::STATE_DONE; - osr->qlock.Unlock(); + { + std::lock_guard l(osr->qlock); + txc->state = TransContext::STATE_DONE; + } _osr_reap_done(osr.get()); } void BlueStore::_osr_reap_done(OpSequencer *osr) { - Mutex::Locker l(osr->qlock); + std::lock_guard l(osr->qlock); dout(20) << __func__ << " osr " << osr << dendl; while (!osr->q.empty()) { TransContext *txc = &osr->q.front(); @@ -3502,7 +3498,7 @@ void BlueStore::_osr_reap_done(OpSequencer *osr) osr->q.pop_front(); delete txc; - osr->qcond.Signal(); + osr->qcond.notify_all(); if (osr->q.empty()) dout(20) << __func__ << " osr " << osr << " q now empty" << dendl; } @@ -3511,7 +3507,7 @@ void BlueStore::_osr_reap_done(OpSequencer *osr) void BlueStore::_kv_sync_thread() { dout(10) << __func__ << " start" << dendl; - kv_lock.Lock(); + std::unique_lock l(kv_lock); while (true) { assert(kv_committing.empty()); assert(wal_cleaning.empty()); @@ -3519,8 +3515,8 @@ void BlueStore::_kv_sync_thread() if (kv_stop) break; dout(20) << __func__ << " sleep" << dendl; - kv_sync_cond.Signal(); - kv_cond.Wait(kv_lock); + kv_sync_cond.notify_all(); + kv_cond.wait(l); dout(20) << __func__ << " wake" << dendl; } else { dout(20) << __func__ << " committing " << kv_queue.size() @@ -3528,7 +3524,7 @@ void BlueStore::_kv_sync_thread() kv_committing.swap(kv_queue); wal_cleaning.swap(wal_cleanup_queue); utime_t start = ceph_clock_now(NULL); - kv_lock.Unlock(); + l.unlock(); dout(30) << __func__ << " committing txc " << kv_committing << dendl; dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl; @@ -3677,10 +3673,9 @@ void BlueStore::_kv_sync_thread() } } - kv_lock.Lock(); + l.lock(); } } - kv_lock.Unlock(); dout(10) << __func__ << " finish" << dendl; } @@ -3718,10 +3713,10 @@ int BlueStore::_wal_finish(TransContext *txc) bluestore_wal_transaction_t& wt = *txc->wal_txn; dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl; - Mutex::Locker l(kv_lock); + std::lock_guard l(kv_lock); txc->state = TransContext::STATE_WAL_CLEANUP; wal_cleanup_queue.push_back(txc); - kv_cond.SignalOne(); + kv_cond.notify_one(); return 0; } diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 08d77cdc00f..d994d58f4e3 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -19,6 +19,9 @@ #include +#include +#include + #include #include #include @@ -118,8 +121,8 @@ public: bool dirty; // ??? bool exists; - Mutex flush_lock; ///< protect flush_txns - Cond flush_cond; ///< wait here for unapplied txns + std::mutex flush_lock; ///< protect flush_txns + std::condition_variable flush_cond; ///< wait here for unapplied txns set flush_txns; ///< committing or wal txns uint64_t tail_offset; @@ -151,11 +154,11 @@ public: boost::intrusive::list_member_hook<>, &Onode::lru_item> > lru_list_t; - Mutex lock; + std::mutex lock; ceph::unordered_map onode_map; ///< forward lookups lru_list_t lru; ///< lru - OnodeHashLRU() : lock("BlueStore::OnodeHashLRU::lock") {} + OnodeHashLRU() {} void add(const ghobject_t& oid, OnodeRef o); void _touch(OnodeRef o); @@ -307,8 +310,8 @@ public: class OpSequencer : public Sequencer_impl { public: - Mutex qlock; - Cond qcond; + std::mutex qlock; + std::condition_variable qcond; typedef boost::intrusive::list< TransContext, boost::intrusive::member_hook< @@ -329,31 +332,31 @@ public: Sequencer *parent; - Mutex wal_apply_lock; + std::mutex wal_apply_mutex; + std::unique_lock wal_apply_lock; OpSequencer() //set the qlock to to PTHREAD_MUTEX_RECURSIVE mode - : qlock("BlueStore::OpSequencer::qlock", true, false), - parent(NULL), - wal_apply_lock("BlueStore::OpSequencer::wal_apply_lock") { + : parent(NULL), + wal_apply_lock(wal_apply_mutex, std::defer_lock) { } ~OpSequencer() { assert(q.empty()); } void queue_new(TransContext *txc) { - Mutex::Locker l(qlock); + std::lock_guard l(qlock); q.push_back(*txc); } void flush() { - Mutex::Locker l(qlock); + std::unique_lock l(qlock); while (!q.empty()) - qcond.Wait(qlock); + qcond.wait(l); } bool flush_commit(Context *c) { - Mutex::Locker l(qlock); + std::lock_guard l(qlock); if (q.empty()) { return true; } @@ -418,12 +421,12 @@ public: // preserve wal ordering for this sequencer by taking the lock // while still holding the queue lock - i->osr->wal_apply_lock.Lock(); + i->osr->wal_apply_lock.lock(); return i; } void _process(TransContext *i, ThreadPool::TPHandle &handle) { store->_wal_apply(i); - i->osr->wal_apply_lock.Unlock(); + i->osr->wal_apply_lock.unlock(); } using ThreadPool::WorkQueue::_process; void _clear() { @@ -468,7 +471,7 @@ private: RWLock coll_lock; ///< rwlock to protect coll_map ceph::unordered_map coll_map; - Mutex nid_lock; + std::mutex nid_lock; uint64_t nid_last; uint64_t nid_max; @@ -477,7 +480,7 @@ private: interval_set bluefs_extents; ///< block extents owned by bluefs - Mutex wal_lock; + std::mutex wal_lock; atomic64_t wal_seq; ThreadPool wal_tp; WALWQ wal_wq; @@ -485,15 +488,15 @@ private: Finisher finisher; KVSyncThread kv_sync_thread; - Mutex kv_lock; - Cond kv_cond, kv_sync_cond; + std::mutex kv_lock; + std::condition_variable kv_cond, kv_sync_cond; bool kv_stop; deque kv_queue, kv_committing; deque wal_cleanup_queue, wal_cleaning; Logger *logger; - Mutex reap_lock; + std::mutex reap_lock; list removed_collections; @@ -564,9 +567,9 @@ private: void _kv_sync_thread(); void _kv_stop() { { - Mutex::Locker l(kv_lock); + std::lock_guard l(kv_lock); kv_stop = true; - kv_cond.Signal(); + kv_cond.notify_all(); } kv_sync_thread.join(); kv_stop = false;