]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 03:18:14 +0000 (11:18 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 03:27:19 +0000 (11:27 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
18 files changed:
src/ceph_mds.cc
src/mds/Beacon.cc
src/mds/JournalPointer.h
src/mds/Locker.cc
src/mds/MDBalancer.cc
src/mds/MDLog.cc
src/mds/MDLog.h
src/mds/MDSContext.cc
src/mds/MDSDaemon.cc
src/mds/MDSDaemon.h
src/mds/MDSRank.cc
src/mds/MDSRank.h
src/mds/PurgeQueue.cc
src/mds/PurgeQueue.h
src/mds/ScrubStack.cc
src/mds/Server.cc
src/mgr/DaemonState.cc
src/mgr/DaemonState.h

index 473d2de7eb5daa9a8f5dd6f88f4ba1e2cf0e537c..38e673755a95a470dba6a303ffe5547e0324da23 100644 (file)
@@ -217,8 +217,8 @@ int main(int argc, const char **argv)
  shutdown:
   // yuck: grab the mds lock, so we can be sure that whoever in *mds
   // called shutdown finishes what they were doing.
-  mds->mds_lock.Lock();
-  mds->mds_lock.Unlock();
+  mds->mds_lock.lock();
+  mds->mds_lock.unlock();
 
   pidfile_remove();
 
index 49effebca8d890823c28edb02d22cd1f417d68f3..3ba89c91c49332a0ce31f7ed8eb0bf4b7c83aa73 100644 (file)
@@ -292,7 +292,7 @@ void Beacon::notify_health(MDSRank const *mds)
   }
 
   // I'm going to touch this MDS, so it must be locked
-  ceph_assert(mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
 
   health.metrics.clear();
 
index 0f42326605413ce4bf45cec8f0f37b2fefe47c9f..1881f41d3a0572b6d61ea51a4574e20dd4d08669 100644 (file)
@@ -20,7 +20,6 @@
 #include "mdstypes.h"
 
 class Objecter;
-class Mutex;
 
 // This always lives in the same location for a given MDS
 // instance, it tells the daemon where to look for the journal.
index 08494469d5a24bf9321a92c91b4dffcf7a4af281..910b048a007b74ae7df1da776b1adb738f0ec827 100644 (file)
@@ -1094,7 +1094,7 @@ public:
   C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
     // We are used as an MDSCacheObject waiter, so should
     // only be invoked by someone already holding the big lock.
-    ceph_assert(locker->mds->mds_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock));
     p->get(MDSCacheObject::PIN_PTRWAITER);    
   }
   void finish(int r) override {
index 42d0a8a1704f604b2297ed92aa2684f5be34ef0a..032053fc5745c28e0537ccc1e287e5c2d259c0f5 100644 (file)
@@ -326,26 +326,26 @@ int MDBalancer::localize_balancer()
   bool ack = false;
   int r = 0;
   bufferlist lua_src;
-  Mutex lock("lock");
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("lock");
+  ceph::condition_variable cond;
 
   /* we assume that balancer is in the metadata pool */
   object_t oid = object_t(mds->mdsmap->get_balancer());
   object_locator_t oloc(mds->mdsmap->get_metadata_pool());
   ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0,
-                                       new C_SafeCond(&lock, &cond, &ack, &r));
+                                       new C_SafeCond(lock, cond, &ack, &r));
   dout(15) << "launched non-blocking read tid=" << tid
            << " oid=" << oid << " oloc=" << oloc << dendl;
 
   /* timeout: if we waste half our time waiting for RADOS, then abort! */
-  auto bal_interval = g_conf().get_val<int64_t>("mds_bal_interval");
-  lock.Lock();
-  int ret_t = cond.WaitInterval(lock, utime_t(bal_interval / 2, 0));
-  lock.Unlock();
-
+  std::cv_status ret_t = [&] {
+    auto bal_interval = g_conf().get_val<int64_t>("mds_bal_interval");
+    std::unique_lock locker{lock};
+    return cond.wait_for(locker, std::chrono::seconds(bal_interval / 2));
+  }();
   /* success: store the balancer in memory and set the version. */
   if (!r) {
-    if (ret_t == ETIMEDOUT) {
+    if (ret_t == std::cv_status::timeout) {
       mds->objecter->op_cancel(tid, -ECANCELED);
       return -ETIMEDOUT;
     }
index 07fddead765b82a32e44d2ee6b011bf9a698adc4..887f777e338f14e3c93162573d10ab9634f4b257 100644 (file)
@@ -254,7 +254,7 @@ void MDLog::append()
 
 void MDLog::_start_entry(LogEvent *e)
 {
-  ceph_assert(submit_mutex.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
 
   ceph_assert(cur_event == NULL);
   cur_event = e;
@@ -277,7 +277,7 @@ void MDLog::cancel_entry(LogEvent *le)
 
 void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
 {
-  ceph_assert(submit_mutex.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
   ceph_assert(!mds->is_any_replay());
   ceph_assert(!capped);
 
@@ -354,17 +354,17 @@ void MDLog::_submit_thread()
 {
   dout(10) << "_submit_thread start" << dendl;
 
-  submit_mutex.Lock();
+  std::unique_lock locker{submit_mutex};
 
   while (!mds->is_daemon_stopping()) {
     if (g_conf()->mds_log_pause) {
-      submit_cond.Wait(submit_mutex);
+      submit_cond.wait(locker);
       continue;
     }
 
     map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
     if (it == pending_events.end()) {
-      submit_cond.Wait(submit_mutex);
+      submit_cond.wait(locker);
       continue;
     }
 
@@ -377,7 +377,7 @@ void MDLog::_submit_thread()
     PendingEvent data = it->second.front();
     it->second.pop_front();
 
-    submit_mutex.Unlock();
+    locker.unlock();
 
     if (data.le) {
       LogEvent *le = data.le;
@@ -430,28 +430,26 @@ void MDLog::_submit_thread()
        journaler->flush();
     }
 
-    submit_mutex.Lock();
+    locker.lock();
     if (data.flush)
       unflushed = 0;
     else if (data.le)
       unflushed++;
   }
-
-  submit_mutex.Unlock();
 }
 
 void MDLog::wait_for_safe(MDSContext *c)
 {
-  submit_mutex.Lock();
+  submit_mutex.lock();
 
   bool no_pending = true;
   if (!pending_events.empty()) {
     pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
     no_pending = false;
-    submit_cond.Signal();
+    submit_cond.notify_all();
   }
 
-  submit_mutex.Unlock();
+  submit_mutex.unlock();
 
   if (no_pending && c)
     journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
@@ -459,17 +457,17 @@ void MDLog::wait_for_safe(MDSContext *c)
 
 void MDLog::flush()
 {
-  submit_mutex.Lock();
+  submit_mutex.lock();
 
   bool do_flush = unflushed > 0;
   unflushed = 0;
   if (!pending_events.empty()) {
     pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
     do_flush = false;
-    submit_cond.Signal();
+    submit_cond.notify_all();
   }
 
-  submit_mutex.Unlock();
+  submit_mutex.unlock();
 
   if (do_flush)
     journaler->flush();
@@ -478,7 +476,7 @@ void MDLog::flush()
 void MDLog::kick_submitter()
 {
   std::lock_guard l(submit_mutex);
-  submit_cond.Signal();
+  submit_cond.notify_all();
 }
 
 void MDLog::cap()
@@ -489,7 +487,7 @@ void MDLog::cap()
 
 void MDLog::shutdown()
 {
-  ceph_assert(mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
 
   dout(5) << "shutdown" << dendl;
   if (submit_thread.is_started()) {
@@ -500,15 +498,15 @@ void MDLog::shutdown()
       // returning from suicide, and subsequently respect mds->is_daemon_stopping()
       // and fall out of its loop.
     } else {
-      mds->mds_lock.Unlock();
+      mds->mds_lock.unlock();
       // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
       // picking it up will do anything with it.
    
-      submit_mutex.Lock();
-      submit_cond.Signal();
-      submit_mutex.Unlock();
+      submit_mutex.lock();
+      submit_cond.notify_all();
+      submit_mutex.unlock();
 
-      mds->mds_lock.Lock();
+      mds->mds_lock.lock();
 
       submit_thread.join();
     }
@@ -521,15 +519,15 @@ void MDLog::shutdown()
   }
 
   if (replay_thread.is_started() && !replay_thread.am_self()) {
-    mds->mds_lock.Unlock();
+    mds->mds_lock.unlock();
     replay_thread.join();
-    mds->mds_lock.Lock();
+    mds->mds_lock.lock();
   }
 
   if (recovery_thread.is_started() && !recovery_thread.am_self()) {
-    mds->mds_lock.Unlock();
+    mds->mds_lock.unlock();
     recovery_thread.join();
-    mds->mds_lock.Lock();
+    mds->mds_lock.lock();
   }
 }
 
@@ -545,7 +543,7 @@ void MDLog::_start_new_segment()
 
 void MDLog::_prepare_new_segment()
 {
-  ceph_assert(submit_mutex.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
 
   uint64_t seq = event_seq + 1;
   dout(7) << __func__ << " seq " << seq << dendl;
@@ -563,7 +561,7 @@ void MDLog::_prepare_new_segment()
 
 void MDLog::_journal_segment_subtree_map(MDSContext *onsync)
 {
-  ceph_assert(submit_mutex.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
 
   dout(7) << __func__ << dendl;
   ESubtreeMap *sle = mds->mdcache->create_subtree_map();
@@ -600,7 +598,7 @@ void MDLog::trim(int m)
     max_events = g_conf()->mds_log_events_per_segment + 1;
   }
 
-  submit_mutex.Lock();
+  submit_mutex.lock();
 
   // trim!
   dout(10) << "trim " 
@@ -611,7 +609,7 @@ void MDLog::trim(int m)
           << dendl;
 
   if (segments.empty()) {
-    submit_mutex.Unlock();
+    submit_mutex.unlock();
     return;
   }
 
@@ -676,12 +674,12 @@ void MDLog::trim(int m)
       new_expiring_segments++;
       expiring_segments.insert(ls);
       expiring_events += ls->num_events;
-      submit_mutex.Unlock();
+      submit_mutex.unlock();
 
       uint64_t last_seq = ls->seq;
       try_expire(ls, op_prio);
 
-      submit_mutex.Lock();
+      submit_mutex.lock();
       p = segments.lower_bound(last_seq + 1);
     }
   }
@@ -691,10 +689,10 @@ void MDLog::trim(int m)
     uint64_t last_seq = get_last_segment_seq();
     if (mds->mdcache->open_file_table.is_any_dirty() ||
        last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) {
-      submit_mutex.Unlock();
+      submit_mutex.unlock();
       mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
                                           last_seq, CEPH_MSG_PRIO_HIGH);
-      submit_mutex.Lock();
+      submit_mutex.lock();
     }
   }
 
@@ -722,7 +720,7 @@ class C_MaybeExpiredSegment : public MDSInternalContext {
  */
 int MDLog::trim_all()
 {
-  submit_mutex.Lock();
+  submit_mutex.lock();
 
   dout(10) << __func__ << ": "
           << segments.size()
@@ -735,10 +733,10 @@ int MDLog::trim_all()
     if (!capped &&
        !mds->mdcache->open_file_table.is_any_committing() &&
        last_seq > mds->mdcache->open_file_table.get_committing_log_seq()) {
-      submit_mutex.Unlock();
+      submit_mutex.unlock();
       mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
                                           last_seq, CEPH_MSG_PRIO_DEFAULT);
-      submit_mutex.Lock();
+      submit_mutex.lock();
     }
   }
 
@@ -752,7 +750,7 @@ int MDLog::trim_all()
     // Caller should have flushed journaler before calling this
     if (pending_events.count(ls->seq)) {
       dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
-      submit_mutex.Unlock();
+      submit_mutex.unlock();
       return -EAGAIN;
     }
 
@@ -766,12 +764,12 @@ int MDLog::trim_all()
       ceph_assert(expiring_segments.count(ls) == 0);
       expiring_segments.insert(ls);
       expiring_events += ls->num_events;
-      submit_mutex.Unlock();
+      submit_mutex.unlock();
 
       uint64_t next_seq = ls->seq + 1;
       try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
 
-      submit_mutex.Lock();
+      submit_mutex.lock();
       p = segments.lower_bound(next_seq);
     }
   }
@@ -793,12 +791,12 @@ void MDLog::try_expire(LogSegment *ls, int op_prio)
     gather_bld.activate();
   } else {
     dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
-    submit_mutex.Lock();
+    submit_mutex.lock();
     ceph_assert(expiring_segments.count(ls));
     expiring_segments.erase(ls);
     expiring_events -= ls->num_events;
     _expired(ls);
-    submit_mutex.Unlock();
+    submit_mutex.unlock();
   }
   
   logger->set(l_mdl_segexg, expiring_segments.size());
@@ -819,7 +817,7 @@ void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
 
 void MDLog::_trim_expired_segments()
 {
-  ceph_assert(submit_mutex.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
 
   uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
 
@@ -863,7 +861,7 @@ void MDLog::_trim_expired_segments()
     trimmed = true;
   }
 
-  submit_mutex.Unlock();
+  submit_mutex.unlock();
 
   if (trimmed)
     journaler->write_head(0);
@@ -871,13 +869,13 @@ void MDLog::_trim_expired_segments()
 
 void MDLog::trim_expired_segments()
 {
-  submit_mutex.Lock();
+  submit_mutex.lock();
   _trim_expired_segments();
 }
 
 void MDLog::_expired(LogSegment *ls)
 {
-  ceph_assert(submit_mutex.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
 
   dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
          << ", " << ls->num_events << " events" << dendl;
index e503e57caf1f3ad18f15e9ad0d85ac3485eeb3ac..261d2419e3f2565db6e06e5af00e60c94a5b0434 100644 (file)
@@ -138,8 +138,8 @@ protected:
 
   int64_t mdsmap_up_features;
   map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
-  Mutex submit_mutex;
-  Cond submit_cond;
+  ceph::mutex submit_mutex = ceph::make_mutex("MDLog::submit_mutex");
+  ceph::condition_variable submit_cond;
 
   void set_safe_pos(uint64_t pos)
   {
@@ -206,7 +206,6 @@ public:
                       recovery_thread(this),
                       event_seq(0), expiring_events(0), expired_events(0),
                      mdsmap_up_features(0),
-                      submit_mutex("MDLog::submit_mutex"),
                       submit_thread(this),
                       cur_event(NULL) { }                
   ~MDLog();
@@ -227,9 +226,10 @@ public:
     _prepare_new_segment();
   }
   void journal_segment_subtree_map(MDSContext *onsync=NULL) {
-    submit_mutex.Lock();
-    _journal_segment_subtree_map(onsync);
-    submit_mutex.Unlock();
+    {
+      std::lock_guard l{submit_mutex};
+      _journal_segment_subtree_map(onsync);
+    }
     if (onsync)
       flush();
   }
@@ -284,13 +284,13 @@ public:
   void submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
     std::lock_guard l(submit_mutex);
     _submit_entry(e, c);
-    submit_cond.Signal();
+    submit_cond.notify_all();
   }
   void start_submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
     std::lock_guard l(submit_mutex);
     _start_entry(e);
     _submit_entry(e, c);
-    submit_cond.Signal();
+    submit_cond.notify_all();
   }
   bool entry_is_open() const { return cur_event != NULL; }
 
index f520737e454fcb7a0d1ca4907e3cb6ee805d4d9b..bdf265965eea2cb032be049896d6c38905818600 100644 (file)
@@ -24,7 +24,7 @@
 void MDSContext::complete(int r) {
   MDSRank *mds = get_mds();
   ceph_assert(mds != nullptr);
-  ceph_assert(mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
   dout(10) << "MDSContext::complete: " << typeid(*this).name() << dendl;
   return Context::complete(r);
 }
index 56038477fe68075a43fde9a234a14619a1c0e16e..0a80bf4db1412de85ff9f4019861dd7ecc479f2c 100644 (file)
@@ -63,7 +63,7 @@
 // cons/des
 MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc) :
   Dispatcher(m->cct),
-  mds_lock("MDSDaemon::mds_lock"),
+  mds_lock(ceph::make_mutex("MDSDaemon::mds_lock")),
   stopping(false),
   timer(m->cct, mds_lock),
   gss_ktfile_client(m->cct->_conf.get_val<std::string>("gss_ktab_client_file")),
@@ -378,9 +378,9 @@ int MDSDaemon::init()
   r = monc->init();
   if (r < 0) {
     derr << "ERROR: failed to init monc: " << cpp_strerror(-r) << dendl;
-    mds_lock.Lock();
+    mds_lock.lock();
     suicide();
-    mds_lock.Unlock();
+    mds_lock.unlock();
     return r;
   }
 
@@ -394,9 +394,9 @@ int MDSDaemon::init()
   r = monc->authenticate();
   if (r < 0) {
     derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl;
-    mds_lock.Lock();
+    mds_lock.lock();
     suicide();
-    mds_lock.Unlock();
+    mds_lock.unlock();
     return r;
   }
 
@@ -410,19 +410,18 @@ int MDSDaemon::init()
     }
     derr << "ERROR: failed to refresh rotating keys, "
          << "maximum retry time reached." << dendl;
-    mds_lock.Lock();
+    std::lock_guard locker{mds_lock};
     suicide();
-    mds_lock.Unlock();
     return -ETIMEDOUT;
   }
 
   mgrc.init();
   messenger->add_dispatcher_head(&mgrc);
 
-  mds_lock.Lock();
+  mds_lock.lock();
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
     dout(4) << __func__ << ": terminated already, dropping out" << dendl;
-    mds_lock.Unlock();
+    mds_lock.unlock();
     return 0;
   }
 
@@ -430,16 +429,15 @@ int MDSDaemon::init()
   monc->sub_want("mgrmap", 0, 0);
   monc->renew_subs();
 
-  mds_lock.Unlock();
+  mds_lock.unlock();
 
   // Set up admin socket before taking mds_lock, so that ordering
   // is consistent (later we take mds_lock within asok callbacks)
   set_up_admin_socket();
-  mds_lock.Lock();
+  std::lock_guard locker{mds_lock};
   if (beacon.get_want_state() == MDSMap::STATE_DNE) {
     suicide();  // we could do something more graceful here
     dout(4) << __func__ << ": terminated already, dropping out" << dendl;
-    mds_lock.Unlock();
     return 0; 
   }
 
@@ -450,8 +448,6 @@ int MDSDaemon::init()
 
   // schedule tick
   reset_tick();
-  mds_lock.Unlock();
-
   return 0;
 }
 
@@ -464,7 +460,7 @@ void MDSDaemon::reset_tick()
   tick_event = timer.add_event_after(
     g_conf()->mds_tick_interval,
     new FunctionContext([this](int) {
-       ceph_assert(mds_lock.is_locked_by_me());
+       ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
        tick();
       }));
 }
@@ -934,7 +930,7 @@ void MDSDaemon::handle_signal(int signum)
 
 void MDSDaemon::suicide()
 {
-  ceph_assert(mds_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(mds_lock));
   
   // make sure we don't suicide twice
   ceph_assert(stopping == false);
index 13b29d19d6b59c865f62b93cf3dbe44d7519b67b..02794153ec0bce23f925b2c10460140a84622445 100644 (file)
@@ -24,7 +24,7 @@
 #include "messages/MMonCommand.h"
 
 #include "common/LogClient.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Timer.h"
 #include "include/Context.h"
 #include "include/types.h"
@@ -46,7 +46,7 @@ class MDSDaemon : public Dispatcher {
    * also check the `stopping` flag.  If stopping is true, you
    * must either do nothing and immediately drop the lock, or
    * never drop the lock again (i.e. call respawn()) */
-  Mutex        mds_lock;
+  ceph::mutex  mds_lock;
   bool         stopping;
 
   SafeTimer    timer;
index 6cd44270bd7fa6941982c8e76e46e4eadf1bbe5f..3d8996d380ec2dd33974e6d3e952f6c4b791cf98 100644 (file)
@@ -51,7 +51,7 @@ public:
   }
 
   void send() {
-    assert(mds->mds_lock.is_locked());
+    assert(ceph_mutex_is_locked(mds->mds_lock));
 
     dout(20) << __func__ << dendl;
 
@@ -253,7 +253,7 @@ public:
   void send() {
     // not really a hard requirement here, but lets ensure this in
     // case we change the logic here.
-    assert(mds->mds_lock.is_locked());
+    assert(ceph_mutex_is_locked(mds->mds_lock));
 
     dout(20) << __func__ << dendl;
     f->open_object_section("result");
@@ -270,7 +270,6 @@ private:
     C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
       : MDSInternalContext(mds),
         timeout(timeout),
-        lock("mds::context::timeout", false, true),
         on_finish(on_finish) {
     }
     ~C_ContextTimeout() {
@@ -308,7 +307,7 @@ private:
     }
 
     uint64_t timeout;
-    Mutex lock;
+    ceph::mutex lock = ceph::make_mutex("mds::context::timeout");
     Context *on_finish = nullptr;
     Context *timer_task = nullptr;
   };
@@ -473,7 +472,7 @@ private:
 
 MDSRank::MDSRank(
     mds_rank_t whoami_,
-    Mutex &mds_lock_,
+    ceph::mutex &mds_lock_,
     LogChannelRef &clog_,
     SafeTimer &timer_,
     Beacon &beacon_,
@@ -831,9 +830,9 @@ void MDSRankDispatcher::shutdown()
 
   purge_queue.shutdown();
 
-  mds_lock.Unlock();
+  mds_lock.unlock();
   finisher->stop(); // no flushing
-  mds_lock.Lock();
+  mds_lock.lock();
 
   if (objecter->initialized)
     objecter->shutdown();
@@ -846,12 +845,12 @@ void MDSRankDispatcher::shutdown()
 
   // release mds_lock for finisher/messenger threads (e.g.
   // MDSDaemon::ms_handle_reset called from Messenger).
-  mds_lock.Unlock();
+  mds_lock.unlock();
 
   // shut down messenger
   messenger->shutdown();
 
-  mds_lock.Lock();
+  mds_lock.lock();
 
   // Workaround unclean shutdown: HeartbeatMap will assert if
   // worker is not removed (as we do in ~MDS), but ~MDS is not
@@ -926,7 +925,7 @@ void MDSRank::respawn()
 void MDSRank::damaged()
 {
   ceph_assert(whoami != MDS_RANK_NONE);
-  ceph_assert(mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
 
   beacon.set_want_state(*mdsmap, MDSMap::STATE_DAMAGED);
   monc->flush_log();  // Flush any clog error from before we were called
@@ -968,13 +967,13 @@ void MDSRank::handle_write_error(int err)
 
 void *MDSRank::ProgressThread::entry()
 {
-  std::lock_guard l(mds->mds_lock);
+  std::unique_lock l(mds->mds_lock);
   while (true) {
-    while (!mds->stopping &&
-          mds->finished_queue.empty() &&
-          (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
-      cond.Wait(mds->mds_lock);
-    }
+    cond.wait(l, [this] {
+      return (mds->stopping ||
+             !mds->finished_queue.empty() ||
+             (!mds->waiting_for_nolaggy.empty() && !mds->beacon.is_laggy()));
+    });
 
     if (mds->stopping) {
       break;
@@ -989,18 +988,18 @@ void *MDSRank::ProgressThread::entry()
 
 void MDSRank::ProgressThread::shutdown()
 {
-  ceph_assert(mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
   ceph_assert(mds->stopping);
 
   if (am_self()) {
     // Stopping is set, we will fall out of our main loop naturally
   } else {
     // Kick the thread to notice mds->stopping, and join it
-    cond.Signal();
-    mds->mds_lock.Unlock();
+    cond.notify_all();
+    mds->mds_lock.unlock();
     if (is_started())
       join();
-    mds->mds_lock.Lock();
+    mds->mds_lock.lock();
   }
 }
 
@@ -1228,7 +1227,7 @@ bool MDSRank::handle_deferrable_message(const cref_t<Message> &m)
  */
 void MDSRank::_advance_queues()
 {
-  ceph_assert(mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
 
   if (!finished_queue.empty()) {
     dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
@@ -1617,7 +1616,7 @@ void MDSRank::boot_start(BootStep step, int r)
 
 void MDSRank::validate_sessions()
 {
-  ceph_assert(mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
   bool valid = true;
 
   // Identify any sessions which have state inconsistent with other,
@@ -2458,11 +2457,10 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command,
       ss << "no target epoch given";
       return true;
     }
-
-    mds_lock.Lock();
-    set_osd_epoch_barrier(target_epoch);
-    mds_lock.Unlock();
-
+    {
+      std::lock_guard l(mds_lock);
+      set_osd_epoch_barrier(target_epoch);
+    }
     C_SaferCond cond;
     bool already_got = objecter->wait_for_map(target_epoch, &cond);
     if (!already_got) {
@@ -2482,8 +2480,7 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command,
       ss << "Invalid client_id specified";
       return true;
     }
-
-    mds_lock.Lock();
+    std::lock_guard l(mds_lock);
     std::stringstream dss;
     bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
         g_conf()->mds_session_blacklist_on_evict, dss);
@@ -2491,7 +2488,6 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command,
       dout(15) << dss.str() << dendl;
       ss << dss.str();
     }
-    mds_lock.Unlock();
   } else if (command == "session config") {
     int64_t client_id;
     std::string option;
@@ -2501,9 +2497,8 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command,
     cmd_getval(g_ceph_context, cmdmap, "option", option);
     bool got_value = cmd_getval(g_ceph_context, cmdmap, "value", value);
 
-    mds_lock.Lock();
+    std::lock_guard l(mds_lock);
     config_client(client_id, !got_value, option, value, ss);
-    mds_lock.Unlock();
   } else if (command == "scrub_path") {
     string path;
     vector<string> scrubop_vec;
@@ -3369,7 +3364,7 @@ bool MDSRank::evict_client(int64_t session_id,
     bool wait, bool blacklist, std::ostream& err_ss,
     Context *on_killed)
 {
-  ceph_assert(mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
 
   // Mutually exclusive args
   ceph_assert(!(wait && on_killed != nullptr));
@@ -3405,7 +3400,7 @@ bool MDSRank::evict_client(int64_t session_id,
   std::vector<std::string> cmd = {tmp};
 
   auto kill_client_session = [this, session_id, wait, on_killed](){
-    ceph_assert(mds_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
     Session *session = sessionmap.get_session(
         entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
     if (session) {
@@ -3415,9 +3410,9 @@ bool MDSRank::evict_client(int64_t session_id,
         C_SaferCond on_safe;
         server->kill_session(session, &on_safe);
 
-        mds_lock.Unlock();
+        mds_lock.unlock();
         on_safe.wait();
-        mds_lock.Lock();
+        mds_lock.lock();
       }
     } else {
       dout(1) << "session " << session_id << " was removed while we waited "
@@ -3432,7 +3427,7 @@ bool MDSRank::evict_client(int64_t session_id,
   };
 
   auto apply_blacklist = [this, cmd](std::function<void ()> fn){
-    ceph_assert(mds_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
 
     Context *on_blacklist_done = new FunctionContext([this, fn](int r) {
       objecter->wait_for_latest_osdmap(
@@ -3458,9 +3453,9 @@ bool MDSRank::evict_client(int64_t session_id,
     if (blacklist) {
       C_SaferCond inline_ctx;
       apply_blacklist([&inline_ctx](){inline_ctx.complete(0);});
-      mds_lock.Unlock();
+      mds_lock.unlock();
       inline_ctx.wait();
-      mds_lock.Lock();
+      mds_lock.lock();
     }
 
     // We dropped mds_lock, so check that session still exists
@@ -3505,7 +3500,7 @@ Context *MDSRank::create_async_exec_context(C_ExecAndReply *ctx) {
 
 MDSRankDispatcher::MDSRankDispatcher(
     mds_rank_t whoami_,
-    Mutex &mds_lock_,
+    ceph::mutex &mds_lock_,
     LogChannelRef &clog_,
     SafeTimer &timer_,
     Beacon &beacon_,
index 8dc422966cb04ff96ed04850a046af0cd7c22b31..50d25fc6a20cc7d053f9887e556b2a91620ba7d3 100644 (file)
@@ -149,7 +149,7 @@ class MDSRank {
     // Reference to global MDS::mds_lock, so that users of MDSRank don't
     // carry around references to the outer MDS, and we can substitute
     // a separate lock here in future potentially.
-    Mutex &mds_lock;
+    ceph::mutex &mds_lock;
 
     mono_time get_starttime() const {
       return starttime;
@@ -243,12 +243,12 @@ class MDSRank {
 
     class ProgressThread : public Thread {
       MDSRank *mds;
-      Cond cond;
+      ceph::condition_variable cond;
       public:
       explicit ProgressThread(MDSRank *mds_) : mds(mds_) {}
       void * entry() override;
       void shutdown();
-      void signal() {cond.Signal();}
+      void signal() {cond.notify_all();}
     } progress_thread;
 
   list<cref_t<Message>> waiting_for_nolaggy;
@@ -325,7 +325,7 @@ class MDSRank {
 
     MDSRank(
         mds_rank_t whoami_,
-        Mutex &mds_lock_,
+        ceph::mutex &mds_lock_,
         LogChannelRef &clog_,
         SafeTimer &timer_,
         Beacon &beacon_,
@@ -639,7 +639,7 @@ public:
 
   MDSRankDispatcher(
       mds_rank_t whoami_,
-      Mutex &mds_lock_,
+      ceph::mutex &mds_lock_,
       LogChannelRef &clog_,
       SafeTimer &timer_,
       Beacon &beacon_,
index 372666b1aa3f315077c63601027c801dc1b47d0e..a7d5c7c1981001ce2561e7099ed72e4dcb8497ef 100644 (file)
@@ -81,7 +81,6 @@ PurgeQueue::PurgeQueue(
   :
     cct(cct_),
     rank(rank_),
-    lock("PurgeQueue"),
     metadata_pool(metadata_pool_),
     finisher(cct, "PurgeQueue", "PQ_Finisher"),
     timer(cct, lock),
@@ -231,7 +230,7 @@ void PurgeQueue::wait_for_recovery(Context* c)
 
 void PurgeQueue::_recover()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   // Journaler::is_readable() adjusts write_pos if partial entry is encountered
   while (1) {
@@ -412,7 +411,7 @@ void PurgeQueue::_go_readonly(int r)
 
 bool PurgeQueue::_consume()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   bool could_consume = false;
   while(_can_consume()) {
@@ -478,7 +477,7 @@ void PurgeQueue::_execute_item(
     const PurgeItem &item,
     uint64_t expire_to)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   in_flight[expire_to] = item;
   logger->set(l_pq_executing, in_flight.size());
@@ -582,7 +581,7 @@ void PurgeQueue::_execute_item(
 void PurgeQueue::_execute_item_complete(
     uint64_t expire_to)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
   dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
   ceph_assert(in_flight.count(expire_to) == 1);
 
index c1d2de46eef3f6999137d183742397833f29391e..7f42ac05be92f45509a7813a77ffda2cbb2d9c0c 100644 (file)
@@ -106,7 +106,7 @@ class PurgeQueue
 protected:
   CephContext *cct;
   const mds_rank_t rank;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("PurgeQueue");
   bool readonly = false;
 
   int64_t metadata_pool;
index 32205afc219f39c00f060e5ba01d1d2aae0e256e..6e731b6591d71e7c9a1298769cfa84301bece885 100644 (file)
@@ -85,7 +85,7 @@ void ScrubStack::_enqueue_inode(CInode *in, CDentry *parent,
 {
   dout(10) << __func__ << " with {" << *in << "}"
            << ", on_finish=" << on_finish << ", top=" << top << dendl;
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   in->scrub_initialize(parent, header, on_finish);
   if (top)
     push_inode(in);
@@ -108,7 +108,7 @@ void ScrubStack::enqueue_inode(CInode *in, ScrubHeaderRef& header,
 
 void ScrubStack::kick_off_scrubs()
 {
-  ceph_assert(mdcache->mds->mds_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock));
   dout(20) << __func__ << ": state=" << state << dendl;
 
   if (clear_inode_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
@@ -499,7 +499,7 @@ ScrubStack::C_KickOffScrubs::C_KickOffScrubs(MDCache *mdcache, ScrubStack *s)
   : MDSInternalContext(mdcache->mds), stack(s) { }
 
 void ScrubStack::complete_control_contexts(int r) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
 
   for (auto &ctx : control_ctxs) {
     ctx->complete(r);
@@ -516,7 +516,7 @@ void ScrubStack::set_state(State next_state) {
 }
 
 bool ScrubStack::scrub_in_transition_state() {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   dout(20) << __func__ << ": state=" << state << dendl;
 
   // STATE_RUNNING is considered as a transition state so as to
@@ -529,7 +529,7 @@ bool ScrubStack::scrub_in_transition_state() {
 }
 
 void ScrubStack::scrub_status(Formatter *f) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
 
   f->open_object_section("result");
 
@@ -600,7 +600,7 @@ void ScrubStack::scrub_status(Formatter *f) {
 }
 
 void ScrubStack::abort_pending_scrubs() {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   ceph_assert(clear_inode_stack);
 
   for (auto inode = inode_stack.begin(); !inode.end(); ++inode) {
@@ -622,7 +622,7 @@ void ScrubStack::abort_pending_scrubs() {
 }
 
 void ScrubStack::scrub_abort(Context *on_finish) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   ceph_assert(on_finish != nullptr);
 
   dout(10) << __func__ << ": aborting with " << scrubs_in_progress
@@ -643,7 +643,7 @@ void ScrubStack::scrub_abort(Context *on_finish) {
 }
 
 void ScrubStack::scrub_pause(Context *on_finish) {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   ceph_assert(on_finish != nullptr);
 
   dout(10) << __func__ << ": pausing with " << scrubs_in_progress
@@ -668,7 +668,7 @@ void ScrubStack::scrub_pause(Context *on_finish) {
 }
 
 bool ScrubStack::scrub_resume() {
-  ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
   dout(20) << __func__ << ": state=" << state << dendl;
 
   int r = 0;
index 92ad6e8a5c9de2e6cf3e195e6abee4bc44dc27a0..ebaa1fa6aa017c439f846276e3e20b2ffc22ce5d 100644 (file)
@@ -381,7 +381,7 @@ void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaim
     if (reply) {
       int64_t session_id = session->get_client().v;
       send_reply = new FunctionContext([this, session_id, reply](int r) {
-           assert(mds->mds_lock.is_locked_by_me());
+           assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
            Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(session_id));
            if (!session) {
              return;
@@ -1098,7 +1098,7 @@ void Server::handle_conf_change(const std::set<std::string>& changed) {
  */
 void Server::kill_session(Session *session, Context *on_safe)
 {
-  ceph_assert(mds->mds_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
 
   if ((session->is_opening() ||
        session->is_open() ||
index 10e526f8d562dd3bdd78c8c35609c0614a80a51e..3e3a30aeeb8d0689602d0e3a951dab879184bbb2 100644 (file)
@@ -134,7 +134,7 @@ void DeviceState::print(ostream& out) const
 
 void DaemonStateIndex::insert(DaemonStatePtr dm)
 {
-  RWLock::WLocker l(lock);
+  std::unique_lock l{lock};
   _insert(dm);
 }
 
@@ -184,7 +184,7 @@ void DaemonStateIndex::_erase(const DaemonKey& dmk)
 DaemonStateCollection DaemonStateIndex::get_by_service(
   const std::string& svc) const
 {
-  RWLock::RLocker l(lock);
+  std::shared_lock l{lock};
 
   DaemonStateCollection result;
 
@@ -200,7 +200,7 @@ DaemonStateCollection DaemonStateIndex::get_by_service(
 DaemonStateCollection DaemonStateIndex::get_by_server(
   const std::string &hostname) const
 {
-  RWLock::RLocker l(lock);
+  std::shared_lock l{lock};
 
   if (by_server.count(hostname)) {
     return by_server.at(hostname);
@@ -211,14 +211,14 @@ DaemonStateCollection DaemonStateIndex::get_by_server(
 
 bool DaemonStateIndex::exists(const DaemonKey &key) const
 {
-  RWLock::RLocker l(lock);
+  std::shared_lock l{lock};
 
   return all.count(key) > 0;
 }
 
 DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key)
 {
-  RWLock::RLocker l(lock);
+  std::shared_lock l{lock};
 
   auto iter = all.find(key);
   if (iter != all.end()) {
@@ -230,7 +230,7 @@ DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key)
 
 void DaemonStateIndex::rm(const DaemonKey &key)
 {
-  RWLock::WLocker l(lock);
+  std::unique_lock l{lock};
   _rm(key);
 }
 
@@ -246,7 +246,7 @@ void DaemonStateIndex::cull(const std::string& svc_name,
 {
   std::vector<string> victims;
 
-  RWLock::WLocker l(lock);
+  std::unique_lock l{lock};
   auto begin = all.lower_bound({svc_name, ""});
   auto end = all.end();
   for (auto &i = begin; i != end; ++i) {
index ce29e9cd9030a5ab10c0c1f7c8638c557494f7fb..17ede40aa15130c6c49edcd971e137a7a595d1c3 100644 (file)
@@ -126,7 +126,7 @@ class DaemonPerfCounters
 class DaemonState
 {
   public:
-  Mutex lock = {"DaemonState::lock"};
+  ceph::mutex lock = ceph::make_mutex("DaemonState::lock");
 
   DaemonKey key;
 
@@ -240,7 +240,8 @@ typedef boost::intrusive_ptr<DeviceState> DeviceStateRef;
 class DaemonStateIndex
 {
 private:
-  mutable RWLock lock = {"DaemonStateIndex", true, true, true};
+  mutable ceph::shared_mutex lock =
+    ceph::make_shared_mutex("DaemonStateIndex", true, true, true);
 
   std::map<std::string, DaemonStateCollection> by_server;
   DaemonStateCollection all;
@@ -286,7 +287,7 @@ public:
   template<typename Callback, typename...Args>
   auto with_daemons_by_server(Callback&& cb, Args&&... args) const ->
     decltype(cb(by_server, std::forward<Args>(args)...)) {
-    RWLock::RLocker l(lock);
+    std::shared_lock l{lock};
     
     return std::forward<Callback>(cb)(by_server, std::forward<Args>(args)...);
   }
@@ -294,7 +295,7 @@ public:
   template<typename Callback, typename...Args>
   bool with_device(const std::string& dev,
                   Callback&& cb, Args&&... args) const {
-    RWLock::RLocker l(lock);
+    std::shared_lock l{lock};
     auto p = devices.find(dev);
     if (p == devices.end()) {
       return false;
@@ -306,7 +307,7 @@ public:
   template<typename Callback, typename...Args>
   bool with_device_write(const std::string& dev,
                         Callback&& cb, Args&&... args) {
-    RWLock::WLocker l(lock);
+    std::unique_lock l{lock};
     auto p = devices.find(dev);
     if (p == devices.end()) {
       return false;
@@ -321,14 +322,14 @@ public:
   template<typename Callback, typename...Args>
   void with_device_create(const std::string& dev,
                          Callback&& cb, Args&&... args) {
-    RWLock::WLocker l(lock);
+    std::unique_lock l{lock};
     auto d = _get_or_create_device(dev);
     std::forward<Callback>(cb)(*d, std::forward<Args>(args)...);
   }
 
   template<typename Callback, typename...Args>
   void with_devices(Callback&& cb, Args&&... args) const {
-    RWLock::RLocker l(lock);
+    std::shared_lock l{lock};
     for (auto& i : devices) {
       std::forward<Callback>(cb)(*i.second, std::forward<Args>(args)...);
     }
@@ -338,7 +339,7 @@ public:
   void with_devices2(CallbackInitial&& cbi,  // with lock taken
                     Callback&& cb,          // for each device
                     Args&&... args) const {
-    RWLock::RLocker l(lock);
+    std::shared_lock l{lock};
     cbi();
     for (auto& i : devices) {
       std::forward<Callback>(cb)(*i.second, std::forward<Args>(args)...);
@@ -357,25 +358,25 @@ public:
   }
 
   void notify_updating(const DaemonKey &k) {
-    RWLock::WLocker l(lock);
+    std::unique_lock l{lock};
     updating.insert(k);
   }
   void clear_updating(const DaemonKey &k) {
-    RWLock::WLocker l(lock);
+    std::unique_lock l{lock};
     updating.erase(k);
   }
   bool is_updating(const DaemonKey &k) {
-    RWLock::RLocker l(lock);
+    std::shared_lock l{lock};
     return updating.count(k) > 0;
   }
 
   void update_metadata(DaemonStatePtr state,
                       const map<string,string>& meta) {
     // remove and re-insert in case the device metadata changed
-    RWLock::WLocker l(lock);
+    std::unique_lock l{lock};
     _rm(state->key);
     {
-      Mutex::Locker l2(state->lock);
+      std::lock_guard l2{state->lock};
       state->set_metadata(meta);
     }
     _insert(state);