]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 03:15:41 +0000 (11:15 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 03:27:18 +0000 (11:27 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
20 files changed:
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/os/filestore/CollectionIndex.h
src/os/filestore/DBObjectMap.cc
src/os/filestore/DBObjectMap.h
src/os/filestore/FDCache.h
src/os/filestore/FileJournal.cc
src/os/filestore/FileJournal.h
src/os/filestore/FileStore.cc
src/os/filestore/FileStore.h
src/os/filestore/IndexManager.cc
src/os/filestore/IndexManager.h
src/os/filestore/Journal.h
src/os/filestore/JournalingObjectStore.cc
src/os/filestore/JournalingObjectStore.h
src/os/filestore/WBThrottle.cc
src/os/filestore/WBThrottle.h
src/os/fs/FS.h
src/os/kstore/KStore.cc
src/os/kstore/KStore.h

index 9233e159cf4589678371011a6f96153ba272749d..dd4e1106c28924383111946ea0e0259d7af27527 100644 (file)
@@ -32,6 +32,7 @@
 #include "common/errno.h"
 #include "common/safe_io.h"
 #include "common/PriorityCache.h"
+#include "common/RWLock.h"
 #include "Allocator.h"
 #include "FreelistManager.h"
 #include "BlueFS.h"
@@ -3447,7 +3448,6 @@ BlueStore::Collection::Collection(BlueStore *store_, OnodeCacheShard *oc, Buffer
   : CollectionImpl(cid),
     store(store_),
     cache(bc),
-    lock("BlueStore::Collection::lock", true, false),
     exists(true),
     onode_map(oc),
     commit_queue(nullptr)
@@ -3558,7 +3558,7 @@ BlueStore::OnodeRef BlueStore::Collection::get_onode(
   bool create,
   bool is_createop)
 {
-  ceph_assert(create ? lock.is_wlocked() : lock.is_locked());
+  ceph_assert(create ? ceph_mutex_is_wlocked(lock) : ceph_mutex_is_locked(lock));
 
   spg_t pgid;
   if (cid.is_pg(&pgid)) {
@@ -3703,7 +3703,7 @@ void BlueStore::Collection::split_cache(
 
 void *BlueStore::MempoolThread::entry()
 {
-  std::unique_lock l(lock);
+  std::unique_lock l{lock};
 
   uint64_t base = store->osd_memory_base;
   double fragmentation = store->osd_memory_expected_fragmentation;
@@ -3848,7 +3848,7 @@ BlueStore::OmapIteratorImpl::OmapIteratorImpl(
   CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
   : c(c), o(o), it(it)
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   if (o->onode.has_omap()) {
     get_omap_key(o->onode.nid, string(), &head);
     get_omap_tail(o->onode.nid, &tail);
@@ -3866,7 +3866,7 @@ string BlueStore::OmapIteratorImpl::_stringify() const
 
 int BlueStore::OmapIteratorImpl::seek_to_first()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   auto start1 = mono_clock::now();
   if (o->onode.has_omap()) {
     it->lower_bound(head);
@@ -3884,7 +3884,7 @@ int BlueStore::OmapIteratorImpl::seek_to_first()
 
 int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   auto start1 = mono_clock::now();
   if (o->onode.has_omap()) {
     string key;
@@ -3910,7 +3910,7 @@ int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
 
 int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   auto start1 = mono_clock::now();
   if (o->onode.has_omap()) {
     string key;
@@ -3936,7 +3936,7 @@ int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
 
 bool BlueStore::OmapIteratorImpl::valid()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   bool r = o->onode.has_omap() && it && it->valid() &&
     it->raw_key().second < tail;
   if (it && it->valid()) {
@@ -3950,7 +3950,7 @@ bool BlueStore::OmapIteratorImpl::valid()
 int BlueStore::OmapIteratorImpl::next()
 {
   int r = -1;
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   auto start1 = mono_clock::now();
   if (o->onode.has_omap()) {
     it->next();
@@ -3967,7 +3967,7 @@ int BlueStore::OmapIteratorImpl::next()
 
 string BlueStore::OmapIteratorImpl::key()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   ceph_assert(it->valid());
   string db_key = it->raw_key().second;
   string user_key;
@@ -3978,7 +3978,7 @@ string BlueStore::OmapIteratorImpl::key()
 
 bufferlist BlueStore::OmapIteratorImpl::value()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   ceph_assert(it->valid());
   return it->value();
 }
@@ -7245,7 +7245,7 @@ int BlueStore::_fsck(bool deep, bool repair)
 
       dout(10) << __func__ << "  " << oid << dendl;
       store_statfs_t onode_statfs;
-      RWLock::RLocker l(c->lock);
+      std::shared_lock l(c->lock);
       OnodeRef o = c->get_onode(oid, false);
       if (o->onode.nid) {
        if (o->onode.nid > nid_max) {
@@ -7595,7 +7595,7 @@ int BlueStore::_fsck(bool deep, bool repair)
        dout(20) << __func__ << " check misreference for col:" << c->cid
                  << " obj:" << oid << dendl;
 
-       RWLock::RLocker l(c->lock);
+       std::shared_lock l(c->lock);
        OnodeRef o = c->get_onode(oid, false);
        o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
        mempool::bluestore_fsck::set<BlobRef> blobs;
@@ -7994,7 +7994,7 @@ void BlueStore::inject_false_free(coll_t cid, ghobject_t oid)
   CollectionRef c = _get_collection(cid);
   ceph_assert(c);
   {
-    RWLock::WLocker l(c->lock); // just to avoid internal asserts
+    std::unique_lock l{c->lock}; // just to avoid internal asserts
     o = c->get_onode(oid, false);
     ceph_assert(o);
     o->extent_map.fault_range(db, 0, OBJECT_MAX_SIZE);
@@ -8046,7 +8046,7 @@ void BlueStore::inject_misreference(coll_t cid1, ghobject_t oid1,
   CollectionRef c1 = _get_collection(cid1);
   ceph_assert(c1);
   {
-    RWLock::WLocker l(c1->lock); // just to avoid internal asserts
+    std::unique_lock l{c1->lock}; // just to avoid internal asserts
     o1 = c1->get_onode(oid1, false);
     ceph_assert(o1);
     o1->extent_map.fault_range(db, offset, OBJECT_MAX_SIZE);
@@ -8055,7 +8055,7 @@ void BlueStore::inject_misreference(coll_t cid1, ghobject_t oid1,
   CollectionRef c2 = _get_collection(cid2);
   ceph_assert(c2);
   {
-    RWLock::WLocker l(c2->lock); // just to avoid internal asserts
+    std::unique_lock l{c2->lock}; // just to avoid internal asserts
     o2 = c2->get_onode(oid2, false);
     ceph_assert(o2);
     o2->extent_map.fault_range(db, offset, OBJECT_MAX_SIZE);
@@ -8302,7 +8302,7 @@ void BlueStore::_check_legacy_statfs_alert()
 
 BlueStore::CollectionRef BlueStore::_get_collection(const coll_t& cid)
 {
-  RWLock::RLocker l(coll_lock);
+  std::shared_lock l(coll_lock);
   ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
   if (cp == coll_map.end())
     return CollectionRef();
@@ -8389,7 +8389,7 @@ ObjectStore::CollectionHandle BlueStore::open_collection(const coll_t& cid)
 ObjectStore::CollectionHandle BlueStore::create_new_collection(
   const coll_t& cid)
 {
-  RWLock::WLocker l(coll_lock);
+  std::unique_lock l{coll_lock};
   Collection *c = new Collection(
     this,
     onode_cache_shards[cid.hash_to_shard(onode_cache_shards.size())],
@@ -8405,7 +8405,7 @@ void BlueStore::set_collection_commit_queue(
     ContextQueue *commit_queue)
 {
   if (commit_queue) {
-    RWLock::RLocker l(coll_lock);
+    std::shared_lock l(coll_lock);
     if (coll_map.count(cid)) {
       coll_map[cid]->commit_queue = commit_queue;
     } else if (new_coll_map.count(cid)) {
@@ -8425,7 +8425,7 @@ bool BlueStore::exists(CollectionHandle &c_, const ghobject_t& oid)
   bool r = true;
 
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
     OnodeRef o = c->get_onode(oid, false);
     if (!o || !o->exists)
       r = false;
@@ -8446,7 +8446,7 @@ int BlueStore::stat(
   dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
 
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
     OnodeRef o = c->get_onode(oid, false);
     if (!o || !o->exists)
       return -ENOENT;
@@ -8471,7 +8471,7 @@ int BlueStore::set_collection_opts(
   dout(15) << __func__ << " " << ch->cid << " options " << opts << dendl;
   if (!c->exists)
     return -ENOENT;
-  RWLock::WLocker l(c->lock);
+  std::unique_lock l{c->lock};
   c->pool_opts = opts;
   return 0;
 }
@@ -8496,7 +8496,7 @@ int BlueStore::read(
   bl.clear();
   int r;
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
     auto start1 = mono_clock::now();
     OnodeRef o = c->get_onode(oid, false);
     log_latency("get_onode@read",
@@ -9033,7 +9033,7 @@ int BlueStore::_fiemap(
   if (!c->exists)
     return -ENOENT;
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
 
     OnodeRef o = c->get_onode(oid, false);
     if (!o || !o->exists) {
@@ -9133,7 +9133,7 @@ int BlueStore::dump_onode(CollectionHandle &c_,
 
   int r;
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
 
     OnodeRef o = c->get_onode(oid, false);
     if (!o || !o->exists) {
@@ -9170,7 +9170,7 @@ int BlueStore::getattr(
 
   int r;
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
     mempool::bluestore_cache_other::string k(name);
 
     OnodeRef o = c->get_onode(oid, false);
@@ -9208,7 +9208,7 @@ int BlueStore::getattrs(
 
   int r;
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
 
     OnodeRef o = c->get_onode(oid, false);
     if (!o || !o->exists) {
@@ -9233,7 +9233,7 @@ int BlueStore::getattrs(
 
 int BlueStore::list_collections(vector<coll_t>& ls)
 {
-  RWLock::RLocker l(coll_lock);
+  std::shared_lock l(coll_lock);
   ls.reserve(coll_map.size());
   for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
        p != coll_map.end();
@@ -9244,7 +9244,7 @@ int BlueStore::list_collections(vector<coll_t>& ls)
 
 bool BlueStore::collection_exists(const coll_t& c)
 {
-  RWLock::RLocker l(coll_lock);
+  std::shared_lock l(coll_lock);
   return coll_map.count(c);
 }
 
@@ -9269,7 +9269,7 @@ int BlueStore::collection_bits(CollectionHandle& ch)
 {
   dout(15) << __func__ << " " << ch->cid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl;
   return c->cnode.bits;
 }
@@ -9284,7 +9284,7 @@ int BlueStore::collection_list(
            << " start " << start << " end " << end << " max " << max << dendl;
   int r;
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l(c->lock);
     r = _collection_list(c, start, end, max, ls, pnext);
   }
 
@@ -9431,7 +9431,7 @@ int BlueStore::omap_get(
   dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
   if (!c->exists)
     return -ENOENT;
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -9483,7 +9483,7 @@ int BlueStore::omap_get_header(
   dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
   if (!c->exists)
     return -ENOENT;
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -9519,7 +9519,7 @@ int BlueStore::omap_get_keys(
   dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
   if (!c->exists)
     return -ENOENT;
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -9567,7 +9567,7 @@ int BlueStore::omap_get_values(
   dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
   if (!c->exists)
     return -ENOENT;
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   int r = 0;
   string final_key;
   OnodeRef o = c->get_onode(oid, false);
@@ -9611,7 +9611,7 @@ int BlueStore::omap_check_keys(
   dout(15) << __func__ << " " << c->get_cid() << " oid " << oid << dendl;
   if (!c->exists)
     return -ENOENT;
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   int r = 0;
   string final_key;
   OnodeRef o = c->get_onode(oid, false);
@@ -9657,7 +9657,7 @@ ObjectMap::ObjectMapIterator BlueStore::get_omap_iterator(
   if (!c->exists) {
     return ObjectMap::ObjectMapIterator();
   }
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l(c->lock);
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
     dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
@@ -10438,7 +10438,7 @@ void BlueStore::_osr_drain_all()
   set<OpSequencerRef> s;
   vector<OpSequencerRef> zombies;
   {
-    RWLock::RLocker l(coll_lock);
+    std::shared_lock l(coll_lock);
     for (auto& i : coll_map) {
       s.insert(i.second->osr);
     }
@@ -10507,7 +10507,7 @@ void BlueStore::_kv_stop()
 {
   dout(10) << __func__ << dendl;
   {
-    std::unique_lock l(kv_lock);
+    std::unique_lock l{kv_lock};
     while (!kv_sync_started) {
       kv_cond.wait(l);
     }
@@ -10515,7 +10515,7 @@ void BlueStore::_kv_stop()
     kv_cond.notify_all();
   }
   {
-    std::unique_lock l(kv_finalize_lock);
+    std::unique_lock l{kv_finalize_lock};
     while (!kv_finalize_started) {
       kv_finalize_cond.wait(l);
     }
@@ -10545,7 +10545,7 @@ void BlueStore::_kv_sync_thread()
 {
   dout(10) << __func__ << " start" << dendl;
   deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
-  std::unique_lock l(kv_lock);
+  std::unique_lock l{kv_lock};
   ceph_assert(!kv_sync_started);
   kv_sync_started = true;
   kv_cond.notify_all();
@@ -10703,7 +10703,7 @@ void BlueStore::_kv_sync_thread()
       ceph_assert(r == 0);
 
       {
-       std::unique_lock m(kv_finalize_lock);
+       std::unique_lock m{kv_finalize_lock};
        if (kv_committing_to_finalize.empty()) {
          kv_committing_to_finalize.swap(kv_committing);
        } else {
@@ -11338,7 +11338,7 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
     }
 
     // object operations
-    RWLock::WLocker l(c->lock);
+    std::unique_lock l(c->lock);
     OnodeRef &o = ovec[op->oid];
     if (!o) {
       ghobject_t oid = i.get_oid(op->oid);
@@ -13535,7 +13535,7 @@ int BlueStore::_create_collection(
   bufferlist bl;
 
   {
-    RWLock::WLocker l(coll_lock);
+    std::unique_lock l(coll_lock);
     if (*c) {
       r = -EEXIST;
       goto out;
@@ -13564,7 +13564,7 @@ int BlueStore::_remove_collection(TransContext *txc, const coll_t &cid,
 
   (*c)->flush_all_but_last();
   {
-    RWLock::WLocker l(coll_lock);
+    std::unique_lock l(coll_lock);
     if (!*c) {
       r = -ENOENT;
       goto out;
@@ -13640,8 +13640,8 @@ int BlueStore::_split_collection(TransContext *txc,
 {
   dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
           << " bits " << bits << dendl;
-  RWLock::WLocker l(c->lock);
-  RWLock::WLocker l2(d->lock);
+  std::unique_lock l(c->lock);
+  std::unique_lock l2(d->lock);
   int r;
 
   // flush all previous deferred writes on this sequencer.  this is a bit
@@ -13693,8 +13693,8 @@ int BlueStore::_merge_collection(
 {
   dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid
           << " bits " << bits << dendl;
-  RWLock::WLocker l((*c)->lock);
-  RWLock::WLocker l2(d->lock);
+  std::unique_lock l((*c)->lock);
+  std::unique_lock l2(d->lock);
   int r;
 
   coll_t cid = (*c)->cid;
@@ -13726,7 +13726,7 @@ int BlueStore::_merge_collection(
 
   // remove source collection
   {
-    RWLock::WLocker l3(coll_lock);
+    std::unique_lock l3(coll_lock);
     _do_remove_collection(txc, c);
   }
 
index 32ead8a614af5a867d1961fa5369df6706ebfff5..c16e41dab88215135a3c8ffa2594363864a8cafc 100644 (file)
@@ -34,6 +34,7 @@
 #include "include/mempool.h"
 #include "common/bloom_filter.hpp"
 #include "common/Finisher.h"
+#include "common/ceph_mutex.h"
 #include "common/Throttle.h"
 #include "common/perf_counters.h"
 #include "common/PriorityCache.h"
@@ -1234,7 +1235,8 @@ public:
     OpSequencerRef osr;
     BufferCacheShard *cache;       ///< our cache shard
     bluestore_cnode_t cnode;
-    RWLock lock;
+    ceph::shared_mutex lock =
+      ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
 
     bool exists;
 
@@ -1768,7 +1770,7 @@ private:
   int fsid_fd = -1;  ///< open handle (locked) to $path/fsid
   bool mounted = false;
 
-  RWLock coll_lock = {"BlueStore::coll_lock"};  ///< rwlock to protect coll_map
+  ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock");  ///< rwlock to protect coll_map
   mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
   map<coll_t,CollectionRef> new_coll_map;
 
@@ -1822,7 +1824,8 @@ private:
 
   list<CollectionRef> removed_collections;
 
-  RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
+  ceph::shared_mutex debug_read_error_lock =
+    ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
   set<ghobject_t> debug_data_error_objects;
   set<ghobject_t> debug_mdata_error_objects;
 
@@ -2512,11 +2515,11 @@ public:
 
   // error injection
   void inject_data_error(const ghobject_t& o) override {
-    RWLock::WLocker l(debug_read_error_lock);
+    std::unique_lock l(debug_read_error_lock);
     debug_data_error_objects.insert(o);
   }
   void inject_mdata_error(const ghobject_t& o) override {
-    RWLock::WLocker l(debug_read_error_lock);
+    std::unique_lock l(debug_read_error_lock);
     debug_mdata_error_objects.insert(o);
   }
 
@@ -2565,19 +2568,19 @@ private:
     if (!cct->_conf->bluestore_debug_inject_read_err) {
       return false;
     }
-    RWLock::RLocker l(debug_read_error_lock);
+    std::shared_lock l(debug_read_error_lock);
     return debug_data_error_objects.count(o);
   }
   bool _debug_mdata_eio(const ghobject_t& o) {
     if (!cct->_conf->bluestore_debug_inject_read_err) {
       return false;
     }
-    RWLock::RLocker l(debug_read_error_lock);
+    std::shared_lock l(debug_read_error_lock);
     return debug_mdata_error_objects.count(o);
   }
   void _debug_obj_on_delete(const ghobject_t& o) {
     if (cct->_conf->bluestore_debug_inject_read_err) {
-      RWLock::WLocker l(debug_read_error_lock);
+      std::unique_lock l(debug_read_error_lock);
       debug_data_error_objects.erase(o);
       debug_mdata_error_objects.erase(o);
     }
index eb43e47dc53080086ddb958eec7d9bef8bfbfb37..d01901ab3e87a67d285cff6b02956794690a8451 100644 (file)
@@ -74,7 +74,8 @@ protected:
   };
  public:
 
-  RWLock access_lock;
+  ceph::shared_mutex access_lock =
+    ceph::make_shared_mutex("CollectionIndex::access_lock", true, false);
   /// Type of returned paths
   typedef std::shared_ptr<Path> IndexedPath;
 
@@ -181,7 +182,7 @@ protected:
   virtual int prep_delete() { return 0; }
 
   CollectionIndex(CephContext* cct, const coll_t& collection)
-    : cct(cct), access_lock("CollectionIndex::access_lock", true, false) {}
+    : cct(cct) {}
 
   /*
    * Pre-hash the collection, this collection should map to a PG folder.
index 5a057014042679df4ebfbf691c50dd9bcf2159da..a0bf758ece498475700e45e91e771031aebbe478 100644 (file)
@@ -1040,7 +1040,7 @@ int DBObjectMap::upgrade_to_v2()
 
 void DBObjectMap::set_state()
 {
-  Mutex::Locker l(header_lock);
+  std::lock_guard l{header_lock};
   KeyValueDB::Transaction t = db->get_transaction();
   write_state(t);
   int ret = db->submit_transaction_sync(t);
@@ -1122,18 +1122,18 @@ int DBObjectMap::sync(const ghobject_t *oid,
      *
      * See 2b63dd25fc1c73fa42e52e9ea4ab5a45dd9422a0 and bug 9891.
      */
-    Mutex::Locker l(header_lock);
+    std::lock_guard l{header_lock};
     write_state(t);
     return db->submit_transaction_sync(t);
   } else {
-    Mutex::Locker l(header_lock);
+    std::lock_guard l{header_lock};
     write_state(t);
     return db->submit_transaction_sync(t);
   }
 }
 
 int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
-  ceph_assert(header_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(header_lock));
   dout(20) << "dbobjectmap: seq is " << state.seq << dendl;
   KeyValueDB::Transaction t = _t ? _t : db->get_transaction();
   bufferlist bl;
@@ -1153,7 +1153,7 @@ DBObjectMap::Header DBObjectMap::_lookup_map_header(
 
   _Header *header = new _Header();
   {
-    Mutex::Locker l(cache_lock);
+    std::lock_guard l{cache_lock};
     if (caches.lookup(oid, header)) {
       ceph_assert(!in_use.count(header->seq));
       in_use.insert(header->seq);
@@ -1172,7 +1172,7 @@ DBObjectMap::Header DBObjectMap::_lookup_map_header(
   auto iter = out.cbegin();
   ret->decode(iter);
   {
-    Mutex::Locker l(cache_lock);
+    std::lock_guard l{cache_lock};
     caches.add(oid, *ret);
   }
 
@@ -1201,9 +1201,8 @@ DBObjectMap::Header DBObjectMap::_generate_new_header(const ghobject_t &oid,
 
 DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
 {
-  Mutex::Locker l(header_lock);
-  while (in_use.count(input->parent))
-    header_cond.Wait(header_lock);
+  std::unique_lock l{header_lock};
+  header_cond.wait(l, [&input, this] { return !in_use.count(input->parent); });
   map<string, bufferlist> out;
   set<string> keys;
   keys.insert(HEADER_KEY);
@@ -1235,7 +1234,7 @@ DBObjectMap::Header DBObjectMap::lookup_create_map_header(
   const ghobject_t &oid,
   KeyValueDB::Transaction t)
 {
-  Mutex::Locker l(header_lock);
+  std::lock_guard l{header_lock};
   Header header = _lookup_map_header(hl, oid);
   if (!header) {
     header = _generate_new_header(oid, Header());
@@ -1278,7 +1277,7 @@ void DBObjectMap::remove_map_header(
   to_remove.insert(map_header_key(oid));
   t->rmkeys(HOBJECT_TO_SEQ, to_remove);
   {
-    Mutex::Locker l(cache_lock);
+    std::lock_guard l{cache_lock};
     caches.clear(oid);
   }
 }
@@ -1296,7 +1295,7 @@ void DBObjectMap::set_map_header(
   header.encode(to_set[map_header_key(oid)]);
   t->set(HOBJECT_TO_SEQ, to_set);
   {
-    Mutex::Locker l(cache_lock);
+    std::lock_guard l{cache_lock};
     caches.add(oid, header);
   }
 }
index 43855246ffba8e6170c26543c437bd751f4e3d60..a217e8cf908808f618c6491156f27ab631aa575d 100644 (file)
@@ -13,8 +13,7 @@
 #include "os/ObjectMap.h"
 #include "kv/KeyValueDB.h"
 #include "osd/osd_types.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
 #include "common/simple_cache.hpp"
 #include <boost/optional/optional_io.hpp>
 
@@ -62,9 +61,9 @@ public:
   /**
    * Serializes access to next_seq as well as the in_use set
    */
-  Mutex header_lock;
-  Cond header_cond;
-  Cond map_header_cond;
+  ceph::mutex header_lock = ceph::make_mutex("DBOBjectMap");
+  ceph::condition_variable header_cond;
+  ceph::condition_variable map_header_cond;
 
   /**
    * Set of headers currently in use
@@ -85,9 +84,10 @@ public:
   public:
     explicit MapHeaderLock(DBObjectMap *db) : db(db) {}
     MapHeaderLock(DBObjectMap *db, const ghobject_t &oid) : db(db), locked(oid) {
-      Mutex::Locker l(db->header_lock);
-      while (db->map_header_in_use.count(*locked))
-       db->map_header_cond.Wait(db->header_lock);
+      std::unique_lock l{db->header_lock};
+      db->map_header_cond.wait(l, [db, this] {
+        return !db->map_header_in_use.count(*locked);
+      });
       db->map_header_in_use.insert(*locked);
     }
 
@@ -107,17 +107,16 @@ public:
 
     ~MapHeaderLock() {
       if (locked) {
-       Mutex::Locker l(db->header_lock);
+       std::lock_guard l{db->header_lock};
        ceph_assert(db->map_header_in_use.count(*locked));
-       db->map_header_cond.Signal();
+       db->map_header_cond.notify_all();
        db->map_header_in_use.erase(*locked);
       }
     }
   };
 
   DBObjectMap(CephContext* cct, KeyValueDB *db)
-    : ObjectMap(cct, db), header_lock("DBOBjectMap"),
-      cache_lock("DBObjectMap::CacheLock"),
+    : ObjectMap(cct, db),
       caches(cct->_conf->filestore_omap_header_cache_size)
     {}
 
@@ -366,7 +365,7 @@ public:
 private:
   /// Implicit lock on Header->seq
   typedef std::shared_ptr<_Header> Header;
-  Mutex cache_lock;
+  ceph::mutex cache_lock = ceph::make_mutex("DBObjectMap::CacheLock");
   SimpleLRU<ghobject_t, _Header> caches;
 
   string map_header_key(const ghobject_t &oid);
@@ -500,7 +499,7 @@ private:
    */
   Header _generate_new_header(const ghobject_t &oid, Header parent);
   Header generate_new_header(const ghobject_t &oid, Header parent) {
-    Mutex::Locker l(header_lock);
+    std::lock_guard l{header_lock};
     return _generate_new_header(oid, parent);
   }
 
@@ -511,7 +510,7 @@ private:
   Header lookup_map_header(
     const MapHeaderLock &l2,
     const ghobject_t &oid) {
-    Mutex::Locker l(header_lock);
+    std::lock_guard l{header_lock};
     return _lookup_map_header(l2, oid);
   }
 
@@ -564,10 +563,10 @@ private:
     explicit RemoveOnDelete(DBObjectMap *db) :
       db(db) {}
     void operator() (_Header *header) {
-      Mutex::Locker l(db->header_lock);
+      std::lock_guard l{db->header_lock};
       ceph_assert(db->in_use.count(header->seq));
       db->in_use.erase(header->seq);
-      db->header_cond.Signal();
+      db->header_cond.notify_all();
       delete header;
     }
   };
index ee8c4fb0d5769752ab85d06724e281200672e5fa..a7d90c0e6525b837cf9a0ffa839712db17df3dfc 100644 (file)
@@ -20,8 +20,6 @@
 #include <cstdio>
 #include "common/config_obs.h"
 #include "common/hobject.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
 #include "common/shared_cache.hpp"
 #include "include/compat.h"
 #include "include/intarith.h"
index f0351fe46273a217bf86343903064bd535e68c77..967c4c1b0c4554f3dbd728c78a7862f8bb59df4e 100644 (file)
@@ -628,13 +628,13 @@ void FileJournal::stop_writer()
   if (!write_stop)
   {
     {
-      Mutex::Locker l(write_lock);
-      Mutex::Locker p(writeq_lock);
+      std::lock_guard l{write_lock};
+      std::lock_guard p{writeq_lock};
       write_stop = true;
-      writeq_cond.Signal();
+      writeq_cond.notify_all();
       // Doesn't hurt to signal commit_cond in case thread is waiting there
       // and caller didn't use committed_thru() first.
-      commit_cond.Signal();
+      commit_cond.notify_all();
     }
     write_thread.join();
 
@@ -646,11 +646,11 @@ void FileJournal::stop_writer()
   // stop aio completeion thread *after* writer thread has stopped
   // and has submitted all of its io
   if (aio && !aio_stop) {
-    aio_lock.Lock();
+    aio_lock.lock();
     aio_stop = true;
-    aio_cond.Signal();
-    write_finish_cond.Signal();
-    aio_lock.Unlock();
+    aio_cond.notify_all();
+    write_finish_cond.notify_all();
+    aio_lock.unlock();
     write_finish_thread.join();
   }
 #endif
@@ -724,7 +724,7 @@ bufferptr FileJournal::prepare_header()
 {
   bufferlist bl;
   {
-    Mutex::Locker l(finisher_lock);
+    std::lock_guard l{finisher_lock};
     header.committed_up_to = journaled_seq;
   }
   encode(header, bl);
@@ -740,7 +740,7 @@ bufferptr FileJournal::prepare_header()
 
 void FileJournal::write_header_sync()
 {
-  Mutex::Locker locker(write_lock);
+  std::lock_guard locker{write_lock};
   must_write_header = true;
   bufferlist bl;
   do_write(bl);
@@ -766,7 +766,11 @@ int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
     if (room >= (header.max_size >> 1) &&
         room - size < (header.max_size >> 1)) {
       dout(10) << " passing half full mark, triggering commit" << dendl;
-      do_sync_cond->SloppySignal();  // initiate a real commit so we can trim
+#ifdef CEPH_DEBUG_MUTEX
+      do_sync_cond->notify_all(true);  // initiate a real commit so we can trim
+#else
+      do_sync_cond->notify_all();
+#endif
     }
   }
 
@@ -812,7 +816,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
        items.erase(it++);
 #ifdef HAVE_LIBAIO
        {
-         Mutex::Locker locker(aio_lock);
+         std::lock_guard locker{aio_lock};
          ceph_assert(aio_write_queue_ops > 0);
          aio_write_queue_ops--;
          ceph_assert(aio_write_queue_bytes >= bytes);
@@ -894,7 +898,7 @@ void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
 
 void FileJournal::queue_completions_thru(uint64_t seq)
 {
-  ceph_assert(finisher_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(finisher_lock));
   utime_t now = ceph_clock_now();
   list<completion_item> items;
   batch_pop_completions(items);
@@ -922,7 +926,7 @@ void FileJournal::queue_completions_thru(uint64_t seq)
     items.erase(it++);
   }
   batch_unpop_completions(items);
-  finisher_cond.Signal();
+  finisher_cond.notify_all();
 }
 
 
@@ -1041,7 +1045,7 @@ void FileJournal::do_write(bufferlist& bl)
   if (write_pos >= header.max_size)
     write_pos = write_pos - header.max_size + get_top();
 
-  write_lock.Unlock();
+  write_lock.unlock();
 
   // split?
   off64_t split = 0;
@@ -1142,13 +1146,13 @@ void FileJournal::do_write(bufferlist& bl)
   utime_t lat = ceph_clock_now() - from;
   dout(20) << "do_write latency " << lat << dendl;
 
-  write_lock.Lock();
+  write_lock.lock();
 
   ceph_assert(write_pos == pos);
   ceph_assert(write_pos % header.alignment == 0);
 
   {
-    Mutex::Locker locker(finisher_lock);
+    std::lock_guard locker{finisher_lock};
     journaled_seq = writing_seq;
 
     // kick finisher?
@@ -1172,9 +1176,8 @@ void FileJournal::flush()
 {
   dout(10) << "waiting for completions to empty" << dendl;
   {
-    Mutex::Locker l(finisher_lock);
-    while (!completions_empty())
-      finisher_cond.Wait(finisher_lock);
+    std::unique_lock l{finisher_lock};
+    finisher_cond.wait(l, [this] { return completions_empty(); });
   }
   dout(10) << "flush waiting for finisher" << dendl;
   finisher->wait_for_empty();
@@ -1187,12 +1190,12 @@ void FileJournal::write_thread_entry()
   dout(10) << "write_thread_entry start" << dendl;
   while (1) {
     {
-      Mutex::Locker locker(writeq_lock);
+      std::unique_lock locker{writeq_lock};
       if (writeq.empty() && !must_write_header) {
        if (write_stop)
          break;
        dout(20) << "write_thread_entry going to sleep" << dendl;
-       writeq_cond.Wait(writeq_lock);
+       writeq_cond.wait(locker);
        dout(20) << "write_thread_entry woke up" << dendl;
        continue;
       }
@@ -1200,7 +1203,7 @@ void FileJournal::write_thread_entry()
 
 #ifdef HAVE_LIBAIO
     if (aio) {
-      Mutex::Locker locker(aio_lock);
+      std::unique_lock locker{aio_lock};
       // should we back off to limit aios in flight?  try to do this
       // adaptively so that we submit larger aios once we have lots of
       // them in flight.
@@ -1224,13 +1227,13 @@ void FileJournal::write_thread_entry()
        dout(20) << "write_thread_entry deferring until more aios complete: "
                 << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
                 << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
-       aio_cond.Wait(aio_lock);
+       aio_cond.wait(locker);
        dout(20) << "write_thread_entry woke up" << dendl;
       }
     }
 #endif
 
-    Mutex::Locker locker(write_lock);
+    std::unique_lock locker{write_lock};
     uint64_t orig_ops = 0;
     uint64_t orig_bytes = 0;
 
@@ -1249,7 +1252,7 @@ void FileJournal::write_thread_entry()
        r = 0;
       } else {
        dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
-       commit_cond.Wait(write_lock);
+       commit_cond.wait(locker);
        dout(20) << "write_thread_entry woke up" << dendl;
        continue;
       }
@@ -1384,7 +1387,7 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
 
     // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
     // modified in check_aio_completion
-    aio_lock.Lock();
+    aio_lock.lock();
     aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
     aio_info& aio = aio_queue.back();
     aio.iov = iov;
@@ -1401,7 +1404,7 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
     // aio could be ereased from aio_queue once it is done
     uint64_t cur_len = aio.len;
     // unlock aio_lock because following io_submit might take time to return
-    aio_lock.Unlock();
+    aio_lock.unlock();
 
     iocb *piocb = &aio.iocb;
 
@@ -1427,9 +1430,9 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
     } while (true);
     pos += cur_len;
   }
-  aio_lock.Lock();
-  write_finish_cond.Signal();
-  aio_lock.Unlock();
+  aio_lock.lock();
+  write_finish_cond.notify_all();
+  aio_lock.unlock();
   return 0;
 }
 #endif
@@ -1440,12 +1443,12 @@ void FileJournal::write_finish_thread_entry()
   dout(10) << __func__ << " enter" << dendl;
   while (true) {
     {
-      Mutex::Locker locker(aio_lock);
+      std::unique_lock locker{aio_lock};
       if (aio_queue.empty()) {
        if (aio_stop)
          break;
        dout(20) << __func__ << " sleeping" << dendl;
-       write_finish_cond.Wait(aio_lock);
+       write_finish_cond.wait(locker);
        continue;
       }
     }
@@ -1466,7 +1469,7 @@ void FileJournal::write_finish_thread_entry()
     }
 
     {
-      Mutex::Locker locker(aio_lock);
+      std::lock_guard locker{aio_lock};
       for (int i=0; i<r; i++) {
        aio_info *ai = (aio_info *)event[i].obj;
        if (event[i].res != ai->len) {
@@ -1491,7 +1494,7 @@ void FileJournal::write_finish_thread_entry()
  */
 void FileJournal::check_aio_completion()
 {
-  ceph_assert(aio_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(aio_lock));
   dout(20) << "check_aio_completion" << dendl;
 
   bool completed_something = false, signal = false;
@@ -1514,7 +1517,7 @@ void FileJournal::check_aio_completion()
   if (completed_something) {
     // kick finisher?
     //  only if we haven't filled up recently!
-    Mutex::Locker locker(finisher_lock);
+    std::lock_guard locker{finisher_lock};
     journaled_seq = new_journaled_seq;
     if (full_state != FULL_NOTFULL) {
       dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
@@ -1531,7 +1534,7 @@ void FileJournal::check_aio_completion()
   }
   if (signal) {
     // maybe write queue was waiting for aio count to drop?
-    aio_cond.Signal();
+    aio_cond.notify_all();
   }
 }
 #endif
@@ -1618,23 +1621,23 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
     }
   }
   {
-    Mutex::Locker l1(writeq_lock);
+    std::lock_guard l1{writeq_lock};
 #ifdef HAVE_LIBAIO
-    Mutex::Locker l2(aio_lock);
+    std::lock_guard l2{aio_lock};
 #endif
-    Mutex::Locker l3(completions_lock);
+    std::lock_guard l3{completions_lock};
 
 #ifdef HAVE_LIBAIO
     aio_write_queue_ops++;
     aio_write_queue_bytes += e.length();
-    aio_cond.Signal();
+    aio_cond.notify_all();
 #endif
 
     completions.push_back(
       completion_item(
        seq, oncommit, ceph_clock_now(), osd_op));
     if (writeq.empty())
-      writeq_cond.Signal();
+      writeq_cond.notify_all();
     writeq.push_back(write_item(seq, e, orig_len, osd_op));
     if (osd_op)
       osd_op->journal_trace.keyval("queue depth", writeq.size());
@@ -1643,21 +1646,21 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
 
 bool FileJournal::writeq_empty()
 {
-  Mutex::Locker locker(writeq_lock);
+  std::lock_guard locker{writeq_lock};
   return writeq.empty();
 }
 
 FileJournal::write_item &FileJournal::peek_write()
 {
-  ceph_assert(write_lock.is_locked());
-  Mutex::Locker locker(writeq_lock);
+  ceph_assert(ceph_mutex_is_locked(write_lock));
+  std::lock_guard locker{writeq_lock};
   return writeq.front();
 }
 
 void FileJournal::pop_write()
 {
-  ceph_assert(write_lock.is_locked());
-  Mutex::Locker locker(writeq_lock);
+  ceph_assert(ceph_mutex_is_locked(write_lock));
+  std::lock_guard locker{writeq_lock};
   if (logger) {
     logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
     logger->dec(l_filestore_journal_queue_ops, 1);
@@ -1667,9 +1670,9 @@ void FileJournal::pop_write()
 
 void FileJournal::batch_pop_write(list<write_item> &items)
 {
-  ceph_assert(write_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(write_lock));
   {
-    Mutex::Locker locker(writeq_lock);
+    std::lock_guard locker{writeq_lock};
     writeq.swap(items);
   }
   for (auto &&i : items) {
@@ -1682,14 +1685,14 @@ void FileJournal::batch_pop_write(list<write_item> &items)
 
 void FileJournal::batch_unpop_write(list<write_item> &items)
 {
-  ceph_assert(write_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(write_lock));
   for (auto &&i : items) {
     if (logger) {
       logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
       logger->inc(l_filestore_journal_queue_ops, 1);
     }
   }
-  Mutex::Locker locker(writeq_lock);
+  std::lock_guard locker{writeq_lock};
   writeq.splice(writeq.begin(), items);
 }
 
@@ -1747,7 +1750,7 @@ void FileJournal::do_discard(int64_t offset, int64_t end)
 
 void FileJournal::committed_thru(uint64_t seq)
 {
-  Mutex::Locker locker(write_lock);
+  std::lock_guard locker{write_lock};
 
   auto released = throttle.flush(seq);
   if (logger) {
@@ -1770,7 +1773,7 @@ void FileJournal::committed_thru(uint64_t seq)
 
   // completions!
   {
-    Mutex::Locker locker(finisher_lock);
+    std::lock_guard locker{finisher_lock};
     queue_completions_thru(seq);
     if (plug_journal_completions && seq >= header.start_seq) {
       dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
@@ -1815,7 +1818,7 @@ void FileJournal::committed_thru(uint64_t seq)
     pop_write();
   }
 
-  commit_cond.Signal();
+  commit_cond.notify_all();
 
   dout(10) << "committed_thru done" << dendl;
 }
index 2313b4b8d0cc4bc9f74d3cb19bd1fa20573f1b16..e9d41331bcc08ab40f4f966e0939491bf6290f59 100644 (file)
 #ifndef CEPH_FILEJOURNAL_H
 #define CEPH_FILEJOURNAL_H
 
-#include <stdlib.h>
+#include <condition_variable>
 #include <deque>
+#include <mutex>
+#include <stdlib.h>
 using std::deque;
 
 #include "Journal.h"
 #include "common/config_fwd.h"
 #include "common/Cond.h"
-#include "common/Mutex.h"
 #include "common/Thread.h"
 #include "common/Throttle.h"
 #include "JournalThrottle.h"
@@ -68,13 +69,13 @@ public:
     write_item() : seq(0), orig_len(0) {}
   };
 
-  Mutex finisher_lock;
-  Cond finisher_cond;
+  ceph::mutex finisher_lock = ceph::make_mutex("FileJournal::finisher_lock");
+  ceph::condition_variable finisher_cond;
   uint64_t journaled_seq;
   bool plug_journal_completions;
 
-  Mutex writeq_lock;
-  Cond writeq_cond;
+  ceph::mutex writeq_lock = ceph::make_mutex("FileJournal::writeq_lock");
+  ceph::condition_variable writeq_cond;
   list<write_item> writeq;
   bool writeq_empty();
   write_item &peek_write();
@@ -82,27 +83,28 @@ public:
   void batch_pop_write(list<write_item> &items);
   void batch_unpop_write(list<write_item> &items);
 
-  Mutex completions_lock;
+  ceph::mutex completions_lock =
+    ceph::make_mutex("FileJournal::completions_lock");
   list<completion_item> completions;
   bool completions_empty() {
-    Mutex::Locker l(completions_lock);
+    std::lock_guard l{completions_lock};
     return completions.empty();
   }
   void batch_pop_completions(list<completion_item> &items) {
-    Mutex::Locker l(completions_lock);
+    std::lock_guard l{completions_lock};
     completions.swap(items);
   }
   void batch_unpop_completions(list<completion_item> &items) {
-    Mutex::Locker l(completions_lock);
+    std::lock_guard l{completions_lock};
     completions.splice(completions.begin(), items);
   }
   completion_item completion_peek_front() {
-    Mutex::Locker l(completions_lock);
+    std::lock_guard l{completions_lock};
     ceph_assert(!completions.empty());
     return completions.front();
   }
   void completion_pop_front() {
-    Mutex::Locker l(completions_lock);
+    std::lock_guard l{completions_lock};
     ceph_assert(!completions.empty());
     completions.pop_front();
   }
@@ -269,14 +271,14 @@ private:
       delete[] iov;
     }
   };
-  Mutex aio_lock;
-  Cond aio_cond;
-  Cond write_finish_cond;
-  io_context_t aio_ctx;
+  ceph::mutex aio_lock = ceph::make_mutex("FileJournal::aio_lock");
+  ceph::condition_variable aio_cond;
+  ceph::condition_variable write_finish_cond;
+  io_context_t aio_ctx = 0;
   list<aio_info> aio_queue;
-  int aio_num, aio_bytes;
-  uint64_t aio_write_queue_ops;
-  uint64_t aio_write_queue_bytes;
+  int aio_num = 0, aio_bytes = 0;
+  uint64_t aio_write_queue_ops = 0;
+  uint64_t aio_write_queue_bytes = 0;
   /// End protected by aio_lock
 #endif
 
@@ -326,11 +328,11 @@ private:
   JournalThrottle throttle;
 
   // write thread
-  Mutex write_lock;
+  ceph::mutex write_lock = ceph::make_mutex("FileJournal::write_lock");
   bool write_stop;
   bool aio_stop;
 
-  Cond commit_cond;
+  ceph::condition_variable commit_cond;
 
   int _open(bool wr, bool create=false);
   int _open_block_device();
@@ -398,15 +400,11 @@ private:
   ZTracer::Endpoint trace_endpoint;
 
  public:
-  FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
+  FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, ceph::condition_variable *sync_cond,
              const char *f, bool dio=false, bool ai=true, bool faio=false) :
     Journal(cct, fsid, fin, sync_cond),
-    finisher_lock("FileJournal::finisher_lock", false, true, false),
     journaled_seq(0),
     plug_journal_completions(false),
-    writeq_lock("FileJournal::writeq_lock", false, true, false),
-    completions_lock(
-      "FileJournal::completions_lock", false, true, false),
     fn(f),
     zero_buf(NULL),
     max_size(0), block_size(0),
@@ -414,20 +412,12 @@ private:
     must_write_header(false),
     write_pos(0), read_pos(0),
     discard(false),
-#ifdef HAVE_LIBAIO
-    aio_lock("FileJournal::aio_lock"),
-    aio_ctx(0),
-    aio_num(0), aio_bytes(0),
-    aio_write_queue_ops(0),
-    aio_write_queue_bytes(0),
-#endif
     last_committed_seq(0),
     journaled_since_start(0),
     full_state(FULL_NOTFULL),
     fd(-1),
     writing_seq(0),
     throttle(cct->_conf->filestore_caller_concurrency),
-    write_lock("FileJournal::write_lock", false, true, false),
     write_stop(true),
     aio_stop(true),
     write_thread(this),
index ac7c6157075b2afeaf3833ec27086445f66d3901..99571d582ea8a513035b30ece2ceb1cfaf81738c 100644 (file)
@@ -236,7 +236,7 @@ int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *b
     return r;
 
   ceph_assert(index.index);
-  RWLock::RLocker l((index.index)->access_lock);
+  std::shared_lock l{(index.index)->access_lock};
 
   r = lfn_find(oid, index, &path);
   if (r < 0)
@@ -281,13 +281,13 @@ int FileStore::lfn_open(const coll_t& cid,
   int fd, exist;
   ceph_assert((*index).index);
   if (need_lock) {
-    ((*index).index)->access_lock.get_write();
+    ((*index).index)->access_lock.lock();
   }
   if (!replaying) {
     *outfd = fdcache.lookup(oid);
     if (*outfd) {
       if (need_lock) {
-        ((*index).index)->access_lock.put_write();
+        ((*index).index)->access_lock.unlock();
       }
       return 0;
     }
@@ -342,7 +342,7 @@ int FileStore::lfn_open(const coll_t& cid,
   }
 
   if (need_lock) {
-    ((*index).index)->access_lock.put_write();
+    ((*index).index)->access_lock.unlock();
   }
 
   return 0;
@@ -350,7 +350,7 @@ int FileStore::lfn_open(const coll_t& cid,
  fail:
 
   if (need_lock) {
-    ((*index).index)->access_lock.put_write();
+    ((*index).index)->access_lock.unlock();
   }
 
   if (r == -EIO && m_filestore_fail_eio) handle_eio();
@@ -395,7 +395,7 @@ int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t&
 
   if (!index_same) {
 
-    RWLock::RLocker l1((index_old.index)->access_lock);
+    std::shared_lock l1{(index_old.index)->access_lock};
 
     r = index_old->lookup(o, &path_old, &exist);
     if (r < 0) {
@@ -405,7 +405,7 @@ int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t&
     if (!exist)
       return -ENOENT;
 
-    RWLock::WLocker l2((index_new.index)->access_lock);
+    std::unique_lock l2{(index_new.index)->access_lock};
 
     r = index_new->lookup(newoid, &path_new, &exist);
     if (r < 0) {
@@ -427,7 +427,7 @@ int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t&
       return r;
     }
   } else {
-    RWLock::WLocker l1((index_old.index)->access_lock);
+    std::unique_lock l1{(index_old.index)->access_lock};
 
     r = index_old->lookup(o, &path_old, &exist);
     if (r < 0) {
@@ -475,7 +475,7 @@ int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
   }
 
   ceph_assert(index.index);
-  RWLock::WLocker l((index.index)->access_lock);
+  std::unique_lock l{(index.index)->access_lock};
 
   {
     IndexedPath path;
@@ -541,12 +541,9 @@ FileStore::FileStore(CephContext* cct, const std::string &base,
   basedir_fd(-1), current_fd(-1),
   backend(nullptr),
   index_manager(cct, do_update),
-  lock("FileStore::lock"),
   force_sync(false),
-  sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
   timer(cct, sync_entry_timeo_lock),
   stop(false), sync_thread(this),
-  coll_lock("FileStore::coll_lock"),
   fdcache(cct),
   wbthrottle(cct),
   next_osr_id(0),
@@ -561,7 +558,6 @@ FileStore::FileStore(CephContext* cct, const std::string &base,
        cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
   logger(nullptr),
   trace_endpoint("0.0.0.0", 0, "FileStore"),
-  read_error_lock("FileStore::read_error_lock"),
   m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
   m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
   m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
@@ -1907,7 +1903,7 @@ int FileStore::mount()
        goto close_current_fd;
       }
       ceph_assert(index.index);
-      RWLock::WLocker l((index.index)->access_lock);
+      std::unique_lock l{(index.index)->access_lock};
 
       index->cleanup();
     }
@@ -1973,10 +1969,11 @@ int FileStore::mount()
 
 stop_sync:
   // stop sync thread
-  lock.Lock();
-  stop = true;
-  sync_cond.Signal();
-  lock.Unlock();
+  {
+    std::lock_guard l{lock};
+    stop = true;
+    sync_cond.notify_all();
+  }
   sync_thread.join();
   if (!m_disable_wbthrottle) {
     wbthrottle.stop();
@@ -2047,14 +2044,15 @@ int FileStore::umount()
   do_force_sync();
 
   {
-    Mutex::Locker l(coll_lock);
+    std::lock_guard l(coll_lock);
     coll_map.clear();
   }
 
-  lock.Lock();
-  stop = true;
-  sync_cond.Signal();
-  lock.Unlock();
+  {
+    std::lock_guard l{lock};
+    stop = true;
+    sync_cond.notify_all();
+  }
   sync_thread.join();
   if (!m_disable_wbthrottle){
     wbthrottle.stop();
@@ -2101,7 +2099,7 @@ int FileStore::umount()
   object_map.reset();
 
   {
-    Mutex::Locker l(sync_entry_timeo_lock);
+    std::lock_guard l{sync_entry_timeo_lock};
     timer.shutdown();
   }
 
@@ -2118,7 +2116,7 @@ int FileStore::umount()
 
 ObjectStore::CollectionHandle FileStore::open_collection(const coll_t& c)
 {
-  Mutex::Locker l(coll_lock);
+  std::lock_guard l{coll_lock};
   auto p = coll_map.find(c);
   if (p == coll_map.end()) {
     return CollectionHandle();
@@ -2128,7 +2126,7 @@ ObjectStore::CollectionHandle FileStore::open_collection(const coll_t& c)
 
 ObjectStore::CollectionHandle FileStore::create_new_collection(const coll_t& c)
 {
-  Mutex::Locker l(coll_lock);
+  std::lock_guard l{coll_lock};
   auto p = coll_map.find(c);
   if (p == coll_map.end()) {
     auto *r = new OpSequencer(cct, ++next_osr_id, c);
@@ -2219,7 +2217,7 @@ void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
     dout(5) << __FUNC__ << ": done stalling" << dendl;
   }
 
-  osr->apply_lock.Lock();
+  osr->apply_lock.lock();
   Op *o = osr->peek_queue();
   o->trace.event("op_apply_start");
   apply_manager.op_apply_start(o->op);
@@ -2243,7 +2241,7 @@ void FileStore::_finish_op(OpSequencer *osr)
   lat -= o->start;
 
   dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " lat " << lat << dendl;
-  osr->apply_lock.Unlock();  // locked in _do_op
+  osr->apply_lock.unlock();  // locked in _do_op
   o->trace.event("_finish_op");
 
   // called with tp lock held
@@ -3811,7 +3809,7 @@ int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobjec
       goto out2;
     }
     ceph_assert(index.index);
-    RWLock::WLocker l((index.index)->access_lock);
+    std::unique_lock l{(index.index)->access_lock};
 
     r = lfn_open(cid, newoid, true, &n, &index);
     if (r < 0) {
@@ -4130,17 +4128,14 @@ private:
 
 void FileStore::sync_entry()
 {
-  lock.Lock();
+  std::unique_lock l{lock};
   while (!stop) {
-    utime_t max_interval;
-    max_interval.set_from_double(m_filestore_max_sync_interval);
-    utime_t min_interval;
-    min_interval.set_from_double(m_filestore_min_sync_interval);
-
-    utime_t startwait = ceph_clock_now();
+    auto min_interval = ceph::make_timespan(m_filestore_min_sync_interval);
+    auto max_interval = ceph::make_timespan(m_filestore_max_sync_interval);
+    auto startwait = ceph::real_clock::now();
     if (!force_sync) {
       dout(20) << __FUNC__ << ":  waiting for max_interval " << max_interval << dendl;
-      sync_cond.WaitInterval(lock, max_interval);
+      sync_cond.wait_for(l, max_interval);
     } else {
       dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
     }
@@ -4153,36 +4148,34 @@ void FileStore::sync_entry()
       break;
     } else {
       // wait for at least the min interval
-      utime_t woke = ceph_clock_now();
-      woke -= startwait;
+      auto woke = ceph::real_clock::now() - startwait;
       dout(20) << __FUNC__ << ": woke after " << woke << dendl;
       if (woke < min_interval) {
-       utime_t t = min_interval;
-       t -= woke;
+       auto t = min_interval - woke;
        dout(20) << __FUNC__ << ": waiting for another " << t
                 << " to reach min interval " << min_interval << dendl;
-       sync_cond.WaitInterval(lock, t);
+       sync_cond.wait_for(l, t);
       }
     }
 
     list<Context*> fin;
   again:
     fin.swap(sync_waiters);
-    lock.Unlock();
+    l.unlock();
 
     op_tp.pause();
     if (apply_manager.commit_start()) {
-      utime_t start = ceph_clock_now();
+      auto start = ceph::real_clock::now();
       uint64_t cp = apply_manager.get_committing_seq();
 
-      sync_entry_timeo_lock.Lock();
+      sync_entry_timeo_lock.lock();
       SyncEntryTimeout *sync_entry_timeo =
        new SyncEntryTimeout(cct, m_filestore_commit_timeout);
       if (!timer.add_event_after(m_filestore_commit_timeout,
                                 sync_entry_timeo)) {
        sync_entry_timeo = nullptr;
       }
-      sync_entry_timeo_lock.Unlock();
+      sync_entry_timeo_lock.unlock();
 
       logger->set(l_filestore_committing, 1);
 
@@ -4251,12 +4244,12 @@ void FileStore::sync_entry()
        }
       }
 
-      utime_t done = ceph_clock_now();
-      utime_t lat = done - start;
-      utime_t dur = done - startwait;
+      auto done = ceph::real_clock::now();
+      auto lat = done - start;
+      auto dur = done - startwait;
       dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
       utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
-      if (max_pause_lat < dur - lat) {
+      if (max_pause_lat < utime_t{dur - lat}) {
         logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
       }
 
@@ -4289,14 +4282,14 @@ void FileStore::sync_entry()
       dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
 
       if (sync_entry_timeo) {
-       Mutex::Locker lock(sync_entry_timeo_lock);
+       std::lock_guard lock{sync_entry_timeo_lock};
        timer.cancel_event(sync_entry_timeo);
       }
     } else {
       op_tp.unpause();
     }
 
-    lock.Lock();
+    l.lock();
     finish_contexts(cct, fin, 0);
     fin.clear();
     if (!sync_waiters.empty()) {
@@ -4309,41 +4302,41 @@ void FileStore::sync_entry()
     }
   }
   stop = false;
-  lock.Unlock();
 }
 
 void FileStore::do_force_sync()
 {
   dout(10) << __FUNC__ << dendl;
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   force_sync = true;
-  sync_cond.Signal();
+  sync_cond.notify_all();
 }
 
 void FileStore::start_sync(Context *onsafe)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   sync_waiters.push_back(onsafe);
-  sync_cond.Signal();
+  sync_cond.notify_all();
   force_sync = true;
   dout(10) << __FUNC__ << dendl;
 }
 
 void FileStore::sync()
 {
-  Mutex l("FileStore::sync");
-  Cond c;
+  ceph::mutex m = ceph::make_mutex("FileStore::sync");
+  ceph::condition_variable c;
   bool done;
-  C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
+  C_SafeCond *fin = new C_SafeCond(m, c, &done);
 
   start_sync(fin);
 
-  l.Lock();
-  while (!done) {
-    dout(10) << "sync waiting" << dendl;
-    c.Wait(l);
-  }
-  l.Unlock();
+  std::unique_lock l{m};
+  c.wait(l, [&done, this] {
+    if (!done) {
+      dout(10) << "sync waiting" << dendl;
+    }
+    return done;
+  });
   dout(10) << "sync done" << dendl;
 }
 
@@ -4366,11 +4359,10 @@ void FileStore::flush()
 
   if (cct->_conf->filestore_blackhole) {
     // wait forever
-    Mutex lock("FileStore::flush::lock");
-    Cond cond;
-    lock.Lock();
-    while (true)
-      cond.Wait(lock);
+    ceph::mutex lock = ceph::make_mutex("FileStore::flush::lock");
+    ceph::condition_variable cond;
+    std::unique_lock l{lock};
+    cond.wait(l, [] {return false;} );
     ceph_abort();
   }
 
@@ -4531,24 +4523,24 @@ int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
 
 // debug EIO injection
 void FileStore::inject_data_error(const ghobject_t &oid) {
-  Mutex::Locker l(read_error_lock);
+  std::lock_guard l{read_error_lock};
   dout(10) << __FUNC__ << ": init error on " << oid << dendl;
   data_error_set.insert(oid);
 }
 void FileStore::inject_mdata_error(const ghobject_t &oid) {
-  Mutex::Locker l(read_error_lock);
+  std::lock_guard l{read_error_lock};
   dout(10) << __FUNC__ << ": init error on " << oid << dendl;
   mdata_error_set.insert(oid);
 }
 
 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
-  Mutex::Locker l(read_error_lock);
+  std::lock_guard l{read_error_lock};
   dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
   data_error_set.erase(oid);
   mdata_error_set.erase(oid);
 }
 bool FileStore::debug_data_eio(const ghobject_t &oid) {
-  Mutex::Locker l(read_error_lock);
+  std::lock_guard l{read_error_lock};
   if (data_error_set.count(oid)) {
     dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
     return true;
@@ -4557,7 +4549,7 @@ bool FileStore::debug_data_eio(const ghobject_t &oid) {
   }
 }
 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
-  Mutex::Locker l(read_error_lock);
+  std::lock_guard l{read_error_lock};
   if (mdata_error_set.count(oid)) {
     dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
     return true;
@@ -5063,7 +5055,7 @@ int FileStore::collection_empty(const coll_t& cid, bool *empty)
   }
 
   ceph_assert(index.index);
-  RWLock::RLocker l((index.index)->access_lock);
+  std::shared_lock l{(index.index)->access_lock};
 
   vector<ghobject_t> ls;
   r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
@@ -5188,7 +5180,7 @@ int FileStore::collection_list(const coll_t& c,
     return r;
 
   ceph_assert(index.index);
-  RWLock::RLocker l((index.index)->access_lock);
+  std::shared_lock l{(index.index)->access_lock};
 
   r = index->collection_list_partial(start, end, max, ls, next);
 
@@ -5225,7 +5217,7 @@ int FileStore::omap_get(CollectionHandle& ch, const ghobject_t &hoid,
     return r;
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0)
       return r;
@@ -5258,7 +5250,7 @@ int FileStore::omap_get_header(
     return r;
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0)
       return r;
@@ -5287,7 +5279,7 @@ int FileStore::omap_get_keys(CollectionHandle& ch, const ghobject_t &hoid, set<s
     return r;
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0)
       return r;
@@ -5321,7 +5313,7 @@ int FileStore::omap_get_values(CollectionHandle& ch, const ghobject_t &hoid,
   }
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0) {
       where = " (lfn_find)";
@@ -5359,7 +5351,7 @@ int FileStore::omap_check_keys(CollectionHandle& ch, const ghobject_t &hoid,
     return r;
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0)
       return r;
@@ -5397,7 +5389,7 @@ ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
   }
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0) {
       dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
@@ -5486,7 +5478,7 @@ int FileStore::_destroy_collection(const coll_t& c)
     if (r < 0)
       goto out;
     ceph_assert(from.index);
-    RWLock::WLocker l((from.index)->access_lock);
+    std::unique_lock l{(from.index)->access_lock};
 
     r = from->prep_delete();
     if (r < 0)
@@ -5692,7 +5684,7 @@ int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
     return r;
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0)
       return r;
@@ -5720,7 +5712,7 @@ int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
   }
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0) {
       dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
@@ -5753,7 +5745,7 @@ int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
     return r;
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0)
       return r;
@@ -5793,7 +5785,7 @@ int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
     return r;
   {
     ceph_assert(index.index);
-    RWLock::RLocker l((index.index)->access_lock);
+    std::shared_lock l{(index.index)->access_lock};
     r = lfn_find(hoid, index);
     if (r < 0)
       return r;
@@ -5852,10 +5844,10 @@ int FileStore::_merge_collection(const coll_t& cid,
 
     if (!r) {
       ceph_assert(from.index);
-      RWLock::WLocker l1((from.index)->access_lock);
+      std::unique_lock l1{(from.index)->access_lock};
 
       ceph_assert(to.index);
-      RWLock::WLocker l2((to.index)->access_lock);
+      std::unique_lock l2{(to.index)->access_lock};
 
       r = from->merge(bits, to.index);
     }
@@ -5872,10 +5864,10 @@ int FileStore::_merge_collection(const coll_t& cid,
 
     if (!r) {
       ceph_assert(from.index);
-      RWLock::WLocker l1((from.index)->access_lock);
+      std::unique_lock l1{(from.index)->access_lock};
 
       ceph_assert(to.index);
-      RWLock::WLocker l2((to.index)->access_lock);
+      std::unique_lock l2{(to.index)->access_lock};
 
       r = from->merge(bits, to.index);
     }
@@ -5959,10 +5951,10 @@ int FileStore::_split_collection(const coll_t& cid,
 
     if (!r) {
       ceph_assert(from.index);
-      RWLock::WLocker l1((from.index)->access_lock);
+      std::unique_lock l1{(from.index)->access_lock};
 
       ceph_assert(to.index);
-      RWLock::WLocker l2((to.index)->access_lock);
+      std::unique_lock l2{(to.index)->access_lock};
 
       r = from->split(rem, bits, to.index);
     }
@@ -6100,7 +6092,7 @@ void FileStore::handle_conf_change(const ConfigProxy& conf,
       changed.count("filestore_max_xattr_value_size_btrfs") ||
       changed.count("filestore_max_xattr_value_size_other")) {
     if (backend) {
-      Mutex::Locker l(lock);
+      std::lock_guard l(lock);
       set_xattr_limits_via_conf();
     }
   }
@@ -6113,7 +6105,7 @@ void FileStore::handle_conf_change(const ConfigProxy& conf,
       changed.count("filestore_queue_high_threshhold") ||
       changed.count("filestore_queue_high_delay_multiple") ||
       changed.count("filestore_queue_max_delay_multiple")) {
-    Mutex::Locker l(lock);
+    std::lock_guard l(lock);
     set_throttle_params();
   }
 
@@ -6125,7 +6117,7 @@ void FileStore::handle_conf_change(const ConfigProxy& conf,
       changed.count("filestore_sloppy_crc_block_size") ||
       changed.count("filestore_max_alloc_hint_size") ||
       changed.count("filestore_fadvise")) {
-    Mutex::Locker l(lock);
+    std::lock_guard l(lock);
     m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
     m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
     m_filestore_kill_at = conf->filestore_kill_at;
@@ -6136,7 +6128,7 @@ void FileStore::handle_conf_change(const ConfigProxy& conf,
     m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
   }
   if (changed.count("filestore_commit_timeout")) {
-    Mutex::Locker l(sync_entry_timeo_lock);
+    std::lock_guard l(sync_entry_timeo_lock);
     m_filestore_commit_timeout = conf->filestore_commit_timeout;
   }
   if (changed.count("filestore_dump_file")) {
@@ -6404,7 +6396,7 @@ void FileStore::OpSequencer::_unregister_apply(Op *o)
 
 void FileStore::OpSequencer::wait_for_apply(const ghobject_t& oid)
 {
-  Mutex::Locker l(qlock);
+  std::unique_lock l{qlock};
   uint32_t key = oid.hobj.get_hash();
 retry:
   while (true) {
@@ -6415,7 +6407,7 @@ retry:
       if (*p->second == oid) {
        dout(20) << __func__ << " " << oid << " waiting on " << p->second
                 << dendl;
-       cond.Wait(qlock);
+       cond.wait(l);
        goto retry;
       }
       ++p;
index e09b9e042e929d37dc3339064a1c7fee932b1a83..def23b8aaaba726f7bf027eb169e6c9bda816709 100644 (file)
@@ -38,7 +38,7 @@
 #include "common/perf_counters.h"
 #include "common/zipkin_trace.h"
 
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "HashIndex.h"
 #include "IndexManager.h"
 #include "os/ObjectMap.h"
@@ -194,11 +194,11 @@ private:
   int lock_fsid();
 
   // sync thread
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("FileStore::lock");
   bool force_sync;
-  Cond sync_cond;
+  ceph::condition_variable sync_cond;
 
-  Mutex sync_entry_timeo_lock;
+  ceph::mutex sync_entry_timeo_lock = ceph::make_mutex("FileStore::sync_entry_timeo_lock");
   SafeTimer timer;
 
   list<Context*> sync_waiters;
@@ -226,16 +226,20 @@ private:
   };
   class OpSequencer : public CollectionImpl {
     CephContext *cct;
-    Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
+    // to protect q, for benefit of flush (peek/dequeue also protected by lock)
+    ceph::mutex qlock =
+      ceph::make_mutex("FileStore::OpSequencer::qlock", false);
     list<Op*> q;
     list<uint64_t> jq;
     list<pair<uint64_t, Context*> > flush_commit_waiters;
-    Cond cond;
+    ceph::condition_variable cond;
     string osr_name_str;
     /// hash of pointers to ghobject_t's for in-flight writes
     unordered_multimap<uint32_t,const ghobject_t*> applying;
   public:
-    Mutex apply_lock;  // for apply mutual exclusion
+    // for apply mutual exclusion
+    ceph::mutex apply_lock =
+      ceph::make_mutex("FileStore::OpSequencer::apply_lock", false);
     int id;
     const char *osr_name;
 
@@ -243,7 +247,6 @@ private:
     bool _get_max_uncompleted(
       uint64_t *seq ///< [out] max uncompleted seq
       ) {
-      ceph_assert(qlock.is_locked());
       ceph_assert(seq);
       *seq = 0;
       if (q.empty() && jq.empty())
@@ -261,7 +264,6 @@ private:
     bool _get_min_uncompleted(
       uint64_t *seq ///< [out] min uncompleted seq
       ) {
-      ceph_assert(qlock.is_locked());
       ceph_assert(seq);
       *seq = 0;
       if (q.empty() && jq.empty())
@@ -289,18 +291,18 @@ private:
     }
 
     void queue_journal(Op *o) {
-      Mutex::Locker l(qlock);
+      std::lock_guard l{qlock};
       jq.push_back(o->op);
       _register_apply(o);
     }
     void dequeue_journal(list<Context*> *to_queue) {
-      Mutex::Locker l(qlock);
+      std::lock_guard l{qlock};
       jq.pop_front();
-      cond.Signal();
+      cond.notify_all();
       _wake_flush_waiters(to_queue);
     }
     void queue(Op *o) {
-      Mutex::Locker l(qlock);
+      std::lock_guard l{qlock};
       q.push_back(o);
       _register_apply(o);
       o->trace.keyval("queue depth", q.size());
@@ -309,29 +311,27 @@ private:
     void _unregister_apply(Op *o);
     void wait_for_apply(const ghobject_t& oid);
     Op *peek_queue() {
-      Mutex::Locker l(qlock);
-      ceph_assert(apply_lock.is_locked());
+      std::lock_guard l{qlock};
+      ceph_assert(ceph_mutex_is_locked(apply_lock));
       return q.front();
     }
 
     Op *dequeue(list<Context*> *to_queue) {
       ceph_assert(to_queue);
-      ceph_assert(apply_lock.is_locked());
-      Mutex::Locker l(qlock);
+      ceph_assert(ceph_mutex_is_locked(apply_lock));
+      std::lock_guard l{qlock};
       Op *o = q.front();
       q.pop_front();
-      cond.Signal();
+      cond.notify_all();
       _unregister_apply(o);
       _wake_flush_waiters(to_queue);
       return o;
     }
 
     void flush() override {
-      Mutex::Locker l(qlock);
-
-      while (cct->_conf->filestore_blackhole)
-       cond.Wait(qlock);  // wait forever
-
+      std::unique_lock l{qlock};
+      // wait forever
+      cond.wait(l, [this] { return !cct->_conf->filestore_blackhole; });
 
       // get max for journal _or_ op queues
       uint64_t seq = 0;
@@ -342,13 +342,14 @@ private:
 
       if (seq) {
        // everything prior to our watermark to drain through either/both queues
-       while ((!q.empty() && q.front()->op <= seq) ||
-              (!jq.empty() && jq.front() <= seq))
-         cond.Wait(qlock);
+       cond.wait(l, [seq, this] {
+          return ((q.empty() || q.front()->op > seq) &&
+                 (jq.empty() || jq.front() > seq));
+        });
       }
     }
     bool flush_commit(Context *c) override {
-      Mutex::Locker l(qlock);
+      std::lock_guard l{qlock};
       uint64_t seq = 0;
       if (_get_max_uncompleted(&seq)) {
        return true;
@@ -361,9 +362,7 @@ private:
     OpSequencer(CephContext* cct, int i, coll_t cid)
       : CollectionImpl(cid),
        cct(cct),
-       qlock("FileStore::OpSequencer::qlock", false, false),
        osr_name_str(stringify(cid)),
-       apply_lock("FileStore::OpSequencer::apply_lock", false, false),
         id(i),
        osr_name(osr_name_str.c_str()) {}
     ~OpSequencer() override {
@@ -372,7 +371,7 @@ private:
   };
   typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
 
-  Mutex coll_lock;
+  ceph::mutex coll_lock = ceph::make_mutex("FileStore::coll_lock");
   map<coll_t,OpSequencerRef> coll_map;
 
   friend ostream& operator<<(ostream& out, const OpSequencer& s);
@@ -661,7 +660,7 @@ public:
   uint64_t estimate_objects_overhead(uint64_t num_objects) override;
 
   // DEBUG read error injection, an object is removed from both on delete()
-  Mutex read_error_lock;
+  ceph::mutex read_error_lock = ceph::make_mutex("FileStore::read_error_lock");
   set<ghobject_t> data_error_set; // read() will return -EIO
   set<ghobject_t> mdata_error_set; // getattr(),stat() will return -EIO
   void inject_data_error(const ghobject_t &oid) override;
index 730950264cac2bb9eab546b01008a8d0e26bd370..11e51b80daff797c14e6289ff87f0d665e76ad9f 100644 (file)
@@ -20,7 +20,6 @@
 
 #include <errno.h>
 
-#include "common/Mutex.h"
 #include "common/Cond.h"
 #include "common/config.h"
 #include "common/debug.h"
@@ -73,7 +72,7 @@ IndexManager::~IndexManager() {
 
 
 int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
-  RWLock::WLocker l(lock);
+  std::unique_lock l{lock};
   int r = set_version(path, version);
   if (r < 0)
     return r;
@@ -122,7 +121,7 @@ int IndexManager::build_index(coll_t c, const char *path, CollectionIndex **inde
 }
 
 bool IndexManager::get_index_optimistic(coll_t c, Index *index) {
-  RWLock::RLocker l(lock);
+  std::shared_lock l{lock};
   ceph::unordered_map<coll_t, CollectionIndex* > ::iterator it = col_indices.find(c);
   if (it == col_indices.end()) 
     return false;
@@ -133,7 +132,7 @@ bool IndexManager::get_index_optimistic(coll_t c, Index *index) {
 int IndexManager::get_index(coll_t c, const string& baseDir, Index *index) {
   if (get_index_optimistic(c, index))
     return 0;
-  RWLock::WLocker l(lock);
+  std::unique_lock l{lock};
   ceph::unordered_map<coll_t, CollectionIndex* > ::iterator it = col_indices.find(c);
   if (it == col_indices.end()) {
     char path[PATH_MAX];
index 19cd29264dd97d3a5260f5afeeb8420d2d69bd07..ec78d6448d3141ddffc6f3c03af071798a97ba13 100644 (file)
@@ -16,7 +16,7 @@
 
 #include "include/unordered_map.h"
 
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Cond.h"
 #include "common/config.h"
 #include "common/debug.h"
@@ -49,7 +49,8 @@ struct Index {
  */
 class IndexManager {
   CephContext* cct;
-  RWLock lock; ///< Lock for Index Manager
+  /// Lock for Index Manager
+  ceph::shared_mutex lock = ceph::make_shared_mutex("IndexManager lock");
   bool upgrade;
   ceph::unordered_map<coll_t, CollectionIndex* > col_indices;
 
@@ -70,7 +71,6 @@ public:
   /// Constructor
   explicit IndexManager(CephContext* cct,
                        bool upgrade) : cct(cct),
-                                       lock("IndexManager lock"),
                                        upgrade(upgrade) {}
 
   ~IndexManager();
index cfb667d89696a56a546e9251283e8ea5dfcd04ca..2c19ad478f0f14c9af8f35715dfddd030e18474b 100644 (file)
@@ -35,11 +35,11 @@ public:
   CephContext* cct;
   PerfCounters *logger;
 protected:
-  Cond *do_sync_cond;
+  ceph::condition_variable *do_sync_cond;
   bool wait_on_full;
 
 public:
-  Journal(CephContext* cct, uuid_d f, Finisher *fin, Cond *c=0) :
+  Journal(CephContext* cct, uuid_d f, Finisher *fin, ceph::condition_variable *c=0) :
     fsid(f), finisher(fin), cct(cct), logger(NULL),
     do_sync_cond(c),
     wait_on_full(false) { }
index 714d0935608ac6d55c383163054c46702798c1de..73daed3426fa216ad9d3168df5dd780e2f83277f 100644 (file)
@@ -122,11 +122,13 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
 
 uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
 {
-  Mutex::Locker l(apply_lock);
-  while (blocked) {
-    dout(10) << "op_apply_start blocked, waiting" << dendl;
-    blocked_cond.Wait(apply_lock);
-  }
+  std::unique_lock l{apply_lock};
+  blocked_cond.wait(l, [this] {
+    if (blocked) {
+      dout(10) << "op_apply_start blocked, waiting" << dendl;
+    }
+    return !blocked;
+  });
   dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> "
           << (open_ops+1) << dendl;
   ceph_assert(!blocked);
@@ -137,7 +139,7 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
 
 void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
 {
-  Mutex::Locker l(apply_lock);
+  std::lock_guard l{apply_lock};
   dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> "
           << (open_ops-1) << ", max_applied_seq " << max_applied_seq << " -> "
           << std::max(op, max_applied_seq) << dendl;
@@ -146,7 +148,7 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
 
   // signal a blocked commit_start
   if (blocked) {
-    blocked_cond.Signal();
+    blocked_cond.notify_all();
   }
 
   // there can be multiple applies in flight; track the max value we
@@ -158,7 +160,7 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
 
 uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
 {
-  lock.Lock();
+  lock.lock();
   uint64_t op = ++op_seq;
   dout(10) << "op_submit_start " << op << dendl;
   return op;
@@ -173,7 +175,7 @@ void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
     ceph_abort_msg("out of order op_submit_finish");
   }
   op_submitted = op;
-  lock.Unlock();
+  lock.unlock();
 }
 
 
@@ -181,7 +183,7 @@ void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
 
 void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
 {
-  Mutex::Locker l(com_lock);
+  std::lock_guard l{com_lock};
   ceph_assert(c);
   commit_waiters[op].push_back(c);
 }
@@ -191,19 +193,21 @@ bool JournalingObjectStore::ApplyManager::commit_start()
   bool ret = false;
 
   {
-    Mutex::Locker l(apply_lock);
+    std::unique_lock l{apply_lock};
     dout(10) << "commit_start max_applied_seq " << max_applied_seq
             << ", open_ops " << open_ops << dendl;
     blocked = true;
-    while (open_ops > 0) {
-      dout(10) << "commit_start waiting for " << open_ops
-              << " open ops to drain" << dendl;
-      blocked_cond.Wait(apply_lock);
-    }
+    blocked_cond.wait(l, [this] {
+      if (open_ops > 0) {
+        dout(10) << "commit_start waiting for " << open_ops
+                << " open ops to drain" << dendl;
+      }
+      return open_ops == 0;
+    });
     ceph_assert(open_ops == 0);
     dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
     {
-      Mutex::Locker l(com_lock);
+      std::lock_guard l{com_lock};
       if (max_applied_seq == committed_seq) {
        dout(10) << "commit_start nothing to do" << dendl;
        blocked = false;
@@ -227,17 +231,17 @@ bool JournalingObjectStore::ApplyManager::commit_start()
 
 void JournalingObjectStore::ApplyManager::commit_started()
 {
-  Mutex::Locker l(apply_lock);
+  std::lock_guard l{apply_lock};
   // allow new ops. (underlying fs should now be committing all prior ops)
   dout(10) << "commit_started committing " << committing_seq << ", unblocking"
           << dendl;
   blocked = false;
-  blocked_cond.Signal();
+  blocked_cond.notify_all();
 }
 
 void JournalingObjectStore::ApplyManager::commit_finish()
 {
-  Mutex::Locker l(com_lock);
+  std::lock_guard l{com_lock};
   dout(10) << "commit_finish thru " << committing_seq << dendl;
 
   if (journal)
index a289d0e86c6137982ac7842d0c9af74659f86886..9eb3d36fe04d2d9a32ce67bc225d02e67765b0a0 100644 (file)
@@ -29,18 +29,18 @@ protected:
 
   class SubmitManager {
     CephContext* cct;
-    Mutex lock;
+    ceph::mutex lock = ceph::make_mutex("JOS::SubmitManager::lock");
     uint64_t op_seq;
     uint64_t op_submitted;
   public:
     SubmitManager(CephContext* cct) :
-      cct(cct), lock("JOS::SubmitManager::lock", false, true, false),
+      cct(cct),
       op_seq(0), op_submitted(0)
     {}
     uint64_t op_submit_start();
     void op_submit_finish(uint64_t op);
     void set_op_seq(uint64_t seq) {
-      Mutex::Locker l(lock);
+      std::lock_guard l{lock};
       op_submitted = op_seq = seq;
     }
     uint64_t get_op_seq() {
@@ -53,24 +53,22 @@ protected:
     Journal *&journal;
     Finisher &finisher;
 
-    Mutex apply_lock;
+    ceph::mutex apply_lock = ceph::make_mutex("JOS::ApplyManager::apply_lock");
     bool blocked;
-    Cond blocked_cond;
+    ceph::condition_variable blocked_cond;
     int open_ops;
     uint64_t max_applied_seq;
 
-    Mutex com_lock;
+    ceph::mutex com_lock = ceph::make_mutex("JOS::ApplyManager::com_lock");
     map<version_t, vector<Context*> > commit_waiters;
     uint64_t committing_seq, committed_seq;
 
   public:
     ApplyManager(CephContext* cct, Journal *&j, Finisher &f) :
       cct(cct), journal(j), finisher(f),
-      apply_lock("JOS::ApplyManager::apply_lock", false, true, false),
       blocked(false),
       open_ops(0),
       max_applied_seq(0),
-      com_lock("JOS::ApplyManager::com_lock", false, true, false),
       committing_seq(0), committed_seq(0) {}
     void reset() {
       ceph_assert(open_ops == 0);
@@ -86,25 +84,25 @@ protected:
     void commit_started();
     void commit_finish();
     bool is_committing() {
-      Mutex::Locker l(com_lock);
+      std::lock_guard l{com_lock};
       return committing_seq != committed_seq;
     }
     uint64_t get_committed_seq() {
-      Mutex::Locker l(com_lock);
+      std::lock_guard l{com_lock};
       return committed_seq;
     }
     uint64_t get_committing_seq() {
-      Mutex::Locker l(com_lock);
+      std::lock_guard l{com_lock};
       return committing_seq;
     }
     void init_seq(uint64_t fs_op_seq) {
       {
-       Mutex::Locker l(com_lock);
+       std::lock_guard l{com_lock};
        committed_seq = fs_op_seq;
        committing_seq = fs_op_seq;
       }
       {
-       Mutex::Locker l(apply_lock);
+       std::lock_guard l{apply_lock};
        max_applied_seq = fs_op_seq;
       }
     }
index ba2ed131662381b7175915d489a233aa04b7cf35..eeb3c5da4f25b96558b285bdad04940f327a112e 100644 (file)
@@ -12,11 +12,10 @@ WBThrottle::WBThrottle(CephContext *cct) :
   cct(cct),
   logger(NULL),
   stopping(true),
-  lock("WBThrottle::lock", false, true, false),
   fs(XFS)
 {
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     set_from_conf();
   }
   ceph_assert(cct);
@@ -47,7 +46,7 @@ WBThrottle::~WBThrottle() {
 void WBThrottle::start()
 {
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     stopping = false;
   }
   create("wb_throttle");
@@ -56,9 +55,9 @@ void WBThrottle::start()
 void WBThrottle::stop()
 {
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     stopping = true;
-    cond.Signal();
+    cond.notify_all();
   }
 
   join();
@@ -86,7 +85,7 @@ const char** WBThrottle::get_tracked_conf_keys() const
 
 void WBThrottle::set_from_conf()
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   if (fs == BTRFS) {
     size_limits.first =
       cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
@@ -116,13 +115,13 @@ void WBThrottle::set_from_conf()
   } else {
     ceph_abort_msg("invalid value for fs");
   }
-  cond.Signal();
+  cond.notify_all();
 }
 
 void WBThrottle::handle_conf_change(const ConfigProxy& conf,
                                    const std::set<std::string> &changed)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   for (const char** i = get_tracked_conf_keys(); *i; ++i) {
     if (changed.count(*i)) {
       set_from_conf();
@@ -132,12 +131,16 @@ void WBThrottle::handle_conf_change(const ConfigProxy& conf,
 }
 
 bool WBThrottle::get_next_should_flush(
+  std::unique_lock<ceph::mutex>& locker,
   boost::tuple<ghobject_t, FDRef, PendingWB> *next)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph_assert(next);
-  while (!stopping && (!beyond_limit() || pending_wbs.empty()))
-         cond.Wait(lock);
+  {
+    cond.wait(locker, [this] {
+      return stopping || (beyond_limit() && !pending_wbs.empty());
+    });
+  }
   if (stopping)
     return false;
   ceph_assert(!pending_wbs.empty());
@@ -153,9 +156,9 @@ bool WBThrottle::get_next_should_flush(
 
 void *WBThrottle::entry()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock l{lock};
   boost::tuple<ghobject_t, FDRef, PendingWB> wb;
-  while (get_next_should_flush(&wb)) {
+  while (get_next_should_flush(l, &wb)) {
     clearing = wb.get<0>();
     cur_ios -= wb.get<2>().ios;
     logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
@@ -165,7 +168,7 @@ void *WBThrottle::entry()
     logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size);
     logger->dec(l_wbthrottle_inodes_dirtied);
     logger->inc(l_wbthrottle_inodes_wb);
-    lock.Unlock();
+    l.unlock();
 #if defined(HAVE_FDATASYNC)
     int r = ::fdatasync(**wb.get<1>());
 #else
@@ -181,9 +184,9 @@ void *WBThrottle::entry()
       ceph_assert(fa_r == 0);
     }
 #endif
-    lock.Lock();
+    l.lock();
     clearing = ghobject_t();
-    cond.Signal();
+    cond.notify_all();
     wb = boost::tuple<ghobject_t, FDRef, PendingWB>();
   }
   return 0;
@@ -193,7 +196,7 @@ void WBThrottle::queue_wb(
   FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len,
   bool nocache)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
     pending_wbs.find(hoid);
   if (wbiter == pending_wbs.end()) {
@@ -215,12 +218,12 @@ void WBThrottle::queue_wb(
   wbiter->second.first.add(nocache, len, 1);
   insert_object(hoid);
   if (beyond_limit())
-    cond.Signal();
+    cond.notify_all();
 }
 
 void WBThrottle::clear()
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   for (ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
         pending_wbs.begin();
        i != pending_wbs.end();
@@ -240,14 +243,13 @@ void WBThrottle::clear()
   pending_wbs.clear();
   lru.clear();
   rev_lru.clear();
-  cond.Signal();
+  cond.notify_all();
 }
 
 void WBThrottle::clear_object(const ghobject_t &hoid)
 {
-  Mutex::Locker l(lock);
-  while (clearing == hoid)
-    cond.Wait(lock);
+  std::unique_lock l{lock};
+  cond.wait(l, [hoid, this] { return clearing != hoid; });
   ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
     pending_wbs.find(hoid);
   if (i == pending_wbs.end())
@@ -261,12 +263,11 @@ void WBThrottle::clear_object(const ghobject_t &hoid)
 
   pending_wbs.erase(i);
   remove_object(hoid);
-  cond.Signal();
+  cond.notify_all();
 }
 
 void WBThrottle::throttle()
 {
-  Mutex::Locker l(lock);
-  while (!stopping && need_flush())
-    cond.Wait(lock);
+  std::unique_lock l{lock};
+  cond.wait(l, [this] { return stopping || !need_flush(); });
 }
index ef809ea4d77ef3e93bc5b19dc481adfcd6656d93..a7f6e1d65005314ef28c5b938bb01f2e2ab2f4ae 100644 (file)
@@ -79,8 +79,8 @@ class WBThrottle : Thread, public md_config_obs_t {
   CephContext *cct;
   PerfCounters *logger;
   bool stopping;
-  Mutex lock;
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("WBThrottle::lock");
+  ceph::condition_variable cond;
 
 
   /**
@@ -89,7 +89,7 @@ class WBThrottle : Thread, public md_config_obs_t {
   list<ghobject_t> lru;
   ceph::unordered_map<ghobject_t, list<ghobject_t>::iterator> rev_lru;
   void remove_object(const ghobject_t &oid) {
-    ceph_assert(lock.is_locked());
+    ceph_assert(ceph_mutex_is_locked(lock));
     ceph::unordered_map<ghobject_t, list<ghobject_t>::iterator>::iterator iter =
       rev_lru.find(oid);
     if (iter == rev_lru.end())
@@ -115,6 +115,7 @@ class WBThrottle : Thread, public md_config_obs_t {
 
   /// get next flush to perform
   bool get_next_should_flush(
+    std::unique_lock<ceph::mutex>& locker,
     boost::tuple<ghobject_t, FDRef, PendingWB> *next ///< [out] next to flush
     ); ///< @return false if we are shutting down
 public:
@@ -152,7 +153,7 @@ public:
   void stop();
   /// Set fs as XFS or BTRFS
   void set_fs(FS new_fs) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     fs = new_fs;
     set_from_conf();
   }
index aafa64e5350bac9d5f634690d216cf7f19fdf3e3..a1852f49f009f13b64a0cabeb8815301787b3202 100644 (file)
@@ -21,7 +21,6 @@
 #include <string>
 
 #include "include/types.h"
-#include "common/Mutex.h"
 #include "common/Cond.h"
 
 class FS {
index eab11add1a27adda89c011bbf663c487646b0cdd..fd206eaa1f04d989ef5e41de4a3324ad8860f507 100644 (file)
@@ -567,7 +567,6 @@ int KStore::OnodeHashLRU::trim(int max)
 KStore::Collection::Collection(KStore *ns, coll_t cid)
   : CollectionImpl(cid),
     store(ns),
-    lock("KStore::Collection::lock", true, false),
     osr(new OpSequencer()),
     onode_map(store->cct)
 {
@@ -588,7 +587,7 @@ KStore::OnodeRef KStore::Collection::get_onode(
   const ghobject_t& oid,
   bool create)
 {
-  ceph_assert(create ? lock.is_wlocked() : lock.is_locked());
+  ceph_assert(create ? ceph_mutex_is_wlocked(lock) : ceph_mutex_is_locked(lock));
 
   spg_t pgid;
   if (cid.is_pg(&pgid)) {
@@ -648,7 +647,6 @@ KStore::KStore(CephContext *cct, const string& path)
     path_fd(-1),
     fsid_fd(-1),
     mounted(false),
-    coll_lock("KStore::coll_lock"),
     nid_last(0),
     nid_max(0),
     throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
@@ -1110,7 +1108,7 @@ ObjectStore::CollectionHandle KStore::open_collection(const coll_t& cid)
 ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid)
 {
   auto *c = new Collection(this, cid);
-  RWLock::WLocker l(coll_lock);
+  std::unique_lock l{coll_lock};
   new_coll_map[cid] = c;
   return c;
 }
@@ -1125,7 +1123,7 @@ int KStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf)
 
 KStore::CollectionRef KStore::_get_collection(coll_t cid)
 {
-  RWLock::RLocker l(coll_lock);
+  std::shared_lock l{coll_lock};
   ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
   if (cp == coll_map.end())
     return CollectionRef();
@@ -1175,7 +1173,7 @@ bool KStore::exists(CollectionHandle& ch, const ghobject_t& oid)
 {
   dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists)
     return false;
@@ -1190,7 +1188,7 @@ int KStore::stat(
 {
   dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists)
     return -ENOENT;
@@ -1221,7 +1219,7 @@ int KStore::read(
           << dendl;
   bl.clear();
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
 
   int r;
 
@@ -1339,7 +1337,7 @@ int KStore::fiemap(
   CollectionRef c = static_cast<Collection*>(ch.get());
   if (!c)
     return -ENOENT;
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
 
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -1373,7 +1371,7 @@ int KStore::getattr(
 {
   dout(15) << __func__ << " " << ch->cid << " " << oid << " " << name << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   int r;
   string k(name);
 
@@ -1402,7 +1400,7 @@ int KStore::getattrs(
 {
   dout(15) << __func__ << " " << ch->cid << " " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   int r;
 
   OnodeRef o = c->get_onode(oid, false);
@@ -1420,7 +1418,7 @@ int KStore::getattrs(
 
 int KStore::list_collections(vector<coll_t>& ls)
 {
-  RWLock::RLocker l(coll_lock);
+  std::shared_lock l{coll_lock};
   for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
        p != coll_map.end();
        ++p)
@@ -1430,7 +1428,7 @@ int KStore::list_collections(vector<coll_t>& ls)
 
 bool KStore::collection_exists(const coll_t& c)
 {
-  RWLock::RLocker l(coll_lock);
+  std::shared_lock l{coll_lock};
   return coll_map.count(c);
 }
 
@@ -1455,7 +1453,7 @@ int KStore::collection_bits(CollectionHandle& ch)
 {
   dout(15) << __func__ << " " << ch->cid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl;
   return c->cnode.bits;
 }
@@ -1471,7 +1469,7 @@ int KStore::collection_list(
            << " start " << start << " end " << end << " max " << max << dendl;
   int r;
   {
-    RWLock::RLocker l(c->lock);
+    std::shared_lock l{c->lock};
     r = _collection_list(c, start, end, max, ls, pnext);
   }
 
@@ -1588,7 +1586,7 @@ KStore::OmapIteratorImpl::OmapIteratorImpl(
   CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
   : c(c), o(o), it(it)
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   if (o->onode.omap_head) {
     get_omap_key(o->onode.omap_head, string(), &head);
     get_omap_tail(o->onode.omap_head, &tail);
@@ -1598,7 +1596,7 @@ KStore::OmapIteratorImpl::OmapIteratorImpl(
 
 int KStore::OmapIteratorImpl::seek_to_first()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   if (o->onode.omap_head) {
     it->lower_bound(head);
   } else {
@@ -1609,7 +1607,7 @@ int KStore::OmapIteratorImpl::seek_to_first()
 
 int KStore::OmapIteratorImpl::upper_bound(const string& after)
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   if (o->onode.omap_head) {
     string key;
     get_omap_key(o->onode.omap_head, after, &key);
@@ -1622,7 +1620,7 @@ int KStore::OmapIteratorImpl::upper_bound(const string& after)
 
 int KStore::OmapIteratorImpl::lower_bound(const string& to)
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   if (o->onode.omap_head) {
     string key;
     get_omap_key(o->onode.omap_head, to, &key);
@@ -1635,7 +1633,7 @@ int KStore::OmapIteratorImpl::lower_bound(const string& to)
 
 bool KStore::OmapIteratorImpl::valid()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
     return true;
   } else {
@@ -1645,7 +1643,7 @@ bool KStore::OmapIteratorImpl::valid()
 
 int KStore::OmapIteratorImpl::next()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   if (o->onode.omap_head) {
     it->next();
     return 0;
@@ -1656,7 +1654,7 @@ int KStore::OmapIteratorImpl::next()
 
 string KStore::OmapIteratorImpl::key()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   ceph_assert(it->valid());
   string db_key = it->raw_key().second;
   string user_key;
@@ -1666,7 +1664,7 @@ string KStore::OmapIteratorImpl::key()
 
 bufferlist KStore::OmapIteratorImpl::value()
 {
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   ceph_assert(it->valid());
   return it->value();
 }
@@ -1680,7 +1678,7 @@ int KStore::omap_get(
 {
   dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -1728,7 +1726,7 @@ int KStore::omap_get_header(
 {
   dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -1760,7 +1758,7 @@ int KStore::omap_get_keys(
 {
   dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -1804,7 +1802,7 @@ int KStore::omap_get_values(
 {
   dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -1838,7 +1836,7 @@ int KStore::omap_check_keys(
 {
   dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   int r = 0;
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
@@ -1874,7 +1872,7 @@ ObjectMap::ObjectMapIterator KStore::get_omap_iterator(
 
   dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
   Collection *c = static_cast<Collection*>(ch.get());
-  RWLock::RLocker l(c->lock);
+  std::shared_lock l{c->lock};
   OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
     dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
@@ -2309,7 +2307,7 @@ void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
     }
 
     // object operations
-    RWLock::WLocker l(c->lock);
+    std::unique_lock l{c->lock};
     OnodeRef &o = ovec[op->oid];
     if (!o) {
       // these operations implicity create the object
@@ -3284,7 +3282,7 @@ int KStore::_create_collection(
   bufferlist bl;
 
   {
-    RWLock::WLocker l(coll_lock);
+    std::unique_lock l{coll_lock};
     if (*c) {
       r = -EEXIST;
       goto out;
@@ -3313,7 +3311,7 @@ int KStore::_remove_collection(TransContext *txc, coll_t cid,
   int r;
 
   {
-    RWLock::WLocker l(coll_lock);
+    std::unique_lock l{coll_lock};
     if (!*c) {
       r = -ENOENT;
       goto out;
@@ -3372,8 +3370,8 @@ int KStore::_split_collection(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
           << " bits " << bits << dendl;
   int r;
-  RWLock::WLocker l(c->lock);
-  RWLock::WLocker l2(d->lock);
+  std::unique_lock l{c->lock};
+  std::unique_lock l2{d->lock};
   c->onode_map.clear();
   d->onode_map.clear();
   c->cnode.bits = bits;
@@ -3397,8 +3395,7 @@ int KStore::_merge_collection(TransContext *txc,
   dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid << " "
           << " bits " << bits << dendl;
   int r;
-  RWLock::WLocker l((*c)->lock);
-  RWLock::WLocker l2(d->lock);
+  std::scoped_lock l{(*c)->lock, d->lock};
   (*c)->onode_map.clear();
   d->onode_map.clear();
   d->cnode.bits = bits;
index 227227fb9be906134d7e03bb64b0de042ef04ce6..4b6f7fe033b491545c2370ed5716b14c06f5b1bb 100644 (file)
@@ -136,7 +136,8 @@ public:
   struct Collection : public CollectionImpl {
     KStore *store;
     kstore_cnode_t cnode;
-    RWLock lock;
+    ceph::shared_mutex lock =
+      ceph::make_shared_mutex("KStore::Collection::lock", true, false);
 
     OpSequencerRef osr;
 
@@ -315,7 +316,8 @@ private:
   int fsid_fd;  ///< open handle (locked) to $path/fsid
   bool mounted;
 
-  RWLock coll_lock;    ///< rwlock to protect coll_map
+  /// rwlock to protect coll_map
+  ceph::shared_mutex coll_lock = ceph::make_shared_mutex("KStore::coll_lock");
   ceph::unordered_map<coll_t, CollectionRef> coll_map;
   map<coll_t,CollectionRef> new_coll_map;