oid(o),
key(k),
dirty(false),
- exists(true),
- flush_lock("BlueStore::Onode::flush_lock") {
+ exists(true) {
}
void BlueStore::Onode::flush()
{
- Mutex::Locker l(flush_lock);
+ std::unique_lock<std::mutex> l(flush_lock);
dout(20) << __func__ << " " << flush_txns << dendl;
while (!flush_txns.empty())
- flush_cond.Wait(flush_lock);
+ flush_cond.wait(l);
dout(20) << __func__ << " done" << dendl;
}
void BlueStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
{
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
dout(30) << __func__ << " " << oid << " " << o << dendl;
assert(onode_map.count(oid) == 0);
onode_map[oid] = o;
BlueStore::OnodeRef BlueStore::OnodeHashLRU::lookup(const ghobject_t& oid)
{
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
dout(30) << __func__ << dendl;
ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
if (p == onode_map.end()) {
void BlueStore::OnodeHashLRU::clear()
{
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
dout(10) << __func__ << dendl;
lru.clear();
onode_map.clear();
void BlueStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
const ghobject_t& new_oid)
{
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
po = onode_map.find(old_oid);
const ghobject_t& after,
pair<ghobject_t,OnodeRef> *next)
{
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
dout(20) << __func__ << " after " << after << dendl;
if (after == ghobject_t()) {
int BlueStore::OnodeHashLRU::trim(int max)
{
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
dout(20) << __func__ << " max " << max
<< " size " << onode_map.size() << dendl;
int trimmed = 0;
fsid_fd(-1),
mounted(false),
coll_lock("BlueStore::coll_lock"),
- nid_lock("BlueStore::nid_lock"),
nid_max(0),
throttle_ops(cct, "bluestore_max_ops", cct->_conf->bluestore_max_ops),
throttle_bytes(cct, "bluestore_max_bytes", cct->_conf->bluestore_max_bytes),
throttle_wal_bytes(cct, "bluestore_wal_max_bytes",
cct->_conf->bluestore_max_bytes +
cct->_conf->bluestore_wal_max_bytes),
- wal_lock("BlueStore::wal_lock"),
wal_seq(0),
wal_tp(cct,
"BlueStore::wal_tp",
&wal_tp),
finisher(cct),
kv_sync_thread(this),
- kv_lock("BlueStore::kv_lock"),
kv_stop(false),
- logger(NULL),
- reap_lock("BlueStore::reap_lock")
+ logger(NULL)
{
_init_logger();
}
// flush aios in flght
bdev->flush();
- kv_lock.Lock();
+ std::unique_lock<std::mutex> l(kv_lock);
while (!kv_committing.empty() ||
!kv_queue.empty()) {
dout(20) << " waiting for kv to commit" << dendl;
- kv_sync_cond.Wait(kv_lock);
+ kv_sync_cond.wait(l);
}
- kv_lock.Unlock();
dout(10) << __func__ << " done" << dendl;
}
void BlueStore::_queue_reap_collection(CollectionRef& c)
{
dout(10) << __func__ << " " << c->cid << dendl;
- Mutex::Locker l(reap_lock);
+ std::lock_guard<std::mutex> l(reap_lock);
removed_collections.push_back(c);
}
void BlueStore::_reap_collections()
{
- reap_lock.Lock();
-
list<CollectionRef> removed_colls;
- removed_colls.swap(removed_collections);
- reap_lock.Unlock();
+ {
+ std::lock_guard<std::mutex> l(reap_lock);
+ removed_colls.swap(removed_collections);
+ }
for (list<CollectionRef>::iterator p = removed_colls.begin();
p != removed_colls.end();
if (r < 0) {
goto out;
}
+ r = r_len;
bufferlist u;
u.substr_of(t, front_extra, x_len);
bl.claim_append(u);
{
if (o->onode.nid)
return;
- Mutex::Locker l(nid_lock);
+ std::lock_guard<std::mutex> l(nid_lock);
o->onode.nid = ++nid_last;
dout(20) << __func__ << " " << o->onode.nid << dendl;
if (nid_last > nid_max) {
return;
case TransContext::STATE_IO_DONE:
- assert(txc->osr->qlock.is_locked()); // see _txc_finish_io
+ //assert(txc->osr->qlock.is_locked()); // see _txc_finish_io
txc->state = TransContext::STATE_KV_QUEUED;
if (!g_conf->bluestore_sync_transaction) {
- Mutex::Locker l(kv_lock);
+ std::lock_guard<std::mutex> l(kv_lock);
if (g_conf->bluestore_sync_submit_transaction) {
db->submit_transaction(txc->t);
}
kv_queue.push_back(txc);
- kv_cond.SignalOne();
+ kv_cond.notify_one();
return;
}
db->submit_transaction_sync(txc->t);
*/
OpSequencer *osr = txc->osr.get();
- Mutex::Locker l(osr->qlock);
+ std::lock_guard<std::mutex> l(osr->qlock);
txc->state = TransContext::STATE_IO_DONE;
OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
dout(20) << " onode " << (*p)->oid << " is " << bl.length() << dendl;
txc->t->set(PREFIX_OBJ, (*p)->key, bl);
- Mutex::Locker l((*p)->flush_lock);
+ std::lock_guard<std::mutex> l((*p)->flush_lock);
(*p)->flush_txns.insert(txc);
}
for (set<OnodeRef>::iterator p = txc->onodes.begin();
p != txc->onodes.end();
++p) {
- Mutex::Locker l((*p)->flush_lock);
+ std::lock_guard<std::mutex> l((*p)->flush_lock);
dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
<< dendl;
assert((*p)->flush_txns.count(txc));
(*p)->flush_txns.erase(txc);
if ((*p)->flush_txns.empty())
- (*p)->flush_cond.Signal();
+ (*p)->flush_cond.notify_all();
}
// clear out refs
throttle_wal_bytes.put(txc->bytes);
OpSequencerRef osr = txc->osr;
- osr->qlock.Lock();
- txc->state = TransContext::STATE_DONE;
- osr->qlock.Unlock();
+ {
+ std::lock_guard<std::mutex> l(osr->qlock);
+ txc->state = TransContext::STATE_DONE;
+ }
_osr_reap_done(osr.get());
}
void BlueStore::_osr_reap_done(OpSequencer *osr)
{
- Mutex::Locker l(osr->qlock);
+ std::lock_guard<std::mutex> l(osr->qlock);
dout(20) << __func__ << " osr " << osr << dendl;
while (!osr->q.empty()) {
TransContext *txc = &osr->q.front();
osr->q.pop_front();
delete txc;
- osr->qcond.Signal();
+ osr->qcond.notify_all();
if (osr->q.empty())
dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
}
void BlueStore::_kv_sync_thread()
{
dout(10) << __func__ << " start" << dendl;
- kv_lock.Lock();
+ std::unique_lock<std::mutex> l(kv_lock);
while (true) {
assert(kv_committing.empty());
assert(wal_cleaning.empty());
if (kv_stop)
break;
dout(20) << __func__ << " sleep" << dendl;
- kv_sync_cond.Signal();
- kv_cond.Wait(kv_lock);
+ kv_sync_cond.notify_all();
+ kv_cond.wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
dout(20) << __func__ << " committing " << kv_queue.size()
kv_committing.swap(kv_queue);
wal_cleaning.swap(wal_cleanup_queue);
utime_t start = ceph_clock_now(NULL);
- kv_lock.Unlock();
+ l.unlock();
dout(30) << __func__ << " committing txc " << kv_committing << dendl;
dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;
}
}
- kv_lock.Lock();
+ l.lock();
}
}
- kv_lock.Unlock();
dout(10) << __func__ << " finish" << dendl;
}
bluestore_wal_transaction_t& wt = *txc->wal_txn;
dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
- Mutex::Locker l(kv_lock);
+ std::lock_guard<std::mutex> l(kv_lock);
txc->state = TransContext::STATE_WAL_CLEANUP;
wal_cleanup_queue.push_back(txc);
- kv_cond.SignalOne();
+ kv_cond.notify_one();
return 0;
}
#include <unistd.h>
+#include <mutex>
+#include <condition_variable>
+
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/unordered_set.hpp>
#include <boost/functional/hash.hpp>
bool dirty; // ???
bool exists;
- Mutex flush_lock; ///< protect flush_txns
- Cond flush_cond; ///< wait here for unapplied txns
+ std::mutex flush_lock; ///< protect flush_txns
+ std::condition_variable flush_cond; ///< wait here for unapplied txns
set<TransContext*> flush_txns; ///< committing or wal txns
uint64_t tail_offset;
boost::intrusive::list_member_hook<>,
&Onode::lru_item> > lru_list_t;
- Mutex lock;
+ std::mutex lock;
ceph::unordered_map<ghobject_t,OnodeRef> onode_map; ///< forward lookups
lru_list_t lru; ///< lru
- OnodeHashLRU() : lock("BlueStore::OnodeHashLRU::lock") {}
+ OnodeHashLRU() {}
void add(const ghobject_t& oid, OnodeRef o);
void _touch(OnodeRef o);
class OpSequencer : public Sequencer_impl {
public:
- Mutex qlock;
- Cond qcond;
+ std::mutex qlock;
+ std::condition_variable qcond;
typedef boost::intrusive::list<
TransContext,
boost::intrusive::member_hook<
Sequencer *parent;
- Mutex wal_apply_lock;
+ std::mutex wal_apply_mutex;
+ std::unique_lock<std::mutex> wal_apply_lock;
OpSequencer()
//set the qlock to to PTHREAD_MUTEX_RECURSIVE mode
- : qlock("BlueStore::OpSequencer::qlock", true, false),
- parent(NULL),
- wal_apply_lock("BlueStore::OpSequencer::wal_apply_lock") {
+ : parent(NULL),
+ wal_apply_lock(wal_apply_mutex, std::defer_lock) {
}
~OpSequencer() {
assert(q.empty());
}
void queue_new(TransContext *txc) {
- Mutex::Locker l(qlock);
+ std::lock_guard<std::mutex> l(qlock);
q.push_back(*txc);
}
void flush() {
- Mutex::Locker l(qlock);
+ std::unique_lock<std::mutex> l(qlock);
while (!q.empty())
- qcond.Wait(qlock);
+ qcond.wait(l);
}
bool flush_commit(Context *c) {
- Mutex::Locker l(qlock);
+ std::lock_guard<std::mutex> l(qlock);
if (q.empty()) {
return true;
}
// preserve wal ordering for this sequencer by taking the lock
// while still holding the queue lock
- i->osr->wal_apply_lock.Lock();
+ i->osr->wal_apply_lock.lock();
return i;
}
void _process(TransContext *i, ThreadPool::TPHandle &handle) {
store->_wal_apply(i);
- i->osr->wal_apply_lock.Unlock();
+ i->osr->wal_apply_lock.unlock();
}
using ThreadPool::WorkQueue<TransContext>::_process;
void _clear() {
RWLock coll_lock; ///< rwlock to protect coll_map
ceph::unordered_map<coll_t, CollectionRef> coll_map;
- Mutex nid_lock;
+ std::mutex nid_lock;
uint64_t nid_last;
uint64_t nid_max;
interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
- Mutex wal_lock;
+ std::mutex wal_lock;
atomic64_t wal_seq;
ThreadPool wal_tp;
WALWQ wal_wq;
Finisher finisher;
KVSyncThread kv_sync_thread;
- Mutex kv_lock;
- Cond kv_cond, kv_sync_cond;
+ std::mutex kv_lock;
+ std::condition_variable kv_cond, kv_sync_cond;
bool kv_stop;
deque<TransContext*> kv_queue, kv_committing;
deque<TransContext*> wal_cleanup_queue, wal_cleaning;
Logger *logger;
- Mutex reap_lock;
+ std::mutex reap_lock;
list<CollectionRef> removed_collections;
void _kv_sync_thread();
void _kv_stop() {
{
- Mutex::Locker l(kv_lock);
+ std::lock_guard<std::mutex> l(kv_lock);
kv_stop = true;
- kv_cond.Signal();
+ kv_cond.notify_all();
}
kv_sync_thread.join();
kv_stop = false;