From 675606bf712ee2a22b6dbc6b568d79d9de05539f Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sun, 7 Jul 2019 11:18:14 +0800 Subject: [PATCH] mds: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- src/ceph_mds.cc | 4 +- src/mds/Beacon.cc | 2 +- src/mds/JournalPointer.h | 1 - src/mds/Locker.cc | 2 +- src/mds/MDBalancer.cc | 18 ++++---- src/mds/MDLog.cc | 90 ++++++++++++++++++++-------------------- src/mds/MDLog.h | 16 +++---- src/mds/MDSContext.cc | 2 +- src/mds/MDSDaemon.cc | 28 ++++++------- src/mds/MDSDaemon.h | 4 +- src/mds/MDSRank.cc | 75 ++++++++++++++++----------------- src/mds/MDSRank.h | 10 ++--- src/mds/PurgeQueue.cc | 9 ++-- src/mds/PurgeQueue.h | 2 +- src/mds/ScrubStack.cc | 18 ++++---- src/mds/Server.cc | 4 +- src/mgr/DaemonState.cc | 14 +++---- src/mgr/DaemonState.h | 27 ++++++------ 18 files changed, 157 insertions(+), 169 deletions(-) diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index 473d2de7eb5da..38e673755a95a 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -217,8 +217,8 @@ int main(int argc, const char **argv) shutdown: // yuck: grab the mds lock, so we can be sure that whoever in *mds // called shutdown finishes what they were doing. - mds->mds_lock.Lock(); - mds->mds_lock.Unlock(); + mds->mds_lock.lock(); + mds->mds_lock.unlock(); pidfile_remove(); diff --git a/src/mds/Beacon.cc b/src/mds/Beacon.cc index 49effebca8d89..3ba89c91c4933 100644 --- a/src/mds/Beacon.cc +++ b/src/mds/Beacon.cc @@ -292,7 +292,7 @@ void Beacon::notify_health(MDSRank const *mds) } // I'm going to touch this MDS, so it must be locked - ceph_assert(mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); health.metrics.clear(); diff --git a/src/mds/JournalPointer.h b/src/mds/JournalPointer.h index 0f42326605413..1881f41d3a057 100644 --- a/src/mds/JournalPointer.h +++ b/src/mds/JournalPointer.h @@ -20,7 +20,6 @@ #include "mdstypes.h" class Objecter; -class Mutex; // This always lives in the same location for a given MDS // instance, it tells the daemon where to look for the journal. diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 08494469d5a24..910b048a007b7 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -1094,7 +1094,7 @@ public: C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) { // We are used as an MDSCacheObject waiter, so should // only be invoked by someone already holding the big lock. - ceph_assert(locker->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock)); p->get(MDSCacheObject::PIN_PTRWAITER); } void finish(int r) override { diff --git a/src/mds/MDBalancer.cc b/src/mds/MDBalancer.cc index 42d0a8a1704f6..032053fc5745c 100644 --- a/src/mds/MDBalancer.cc +++ b/src/mds/MDBalancer.cc @@ -326,26 +326,26 @@ int MDBalancer::localize_balancer() bool ack = false; int r = 0; bufferlist lua_src; - Mutex lock("lock"); - Cond cond; + ceph::mutex lock = ceph::make_mutex("lock"); + ceph::condition_variable cond; /* we assume that balancer is in the metadata pool */ object_t oid = object_t(mds->mdsmap->get_balancer()); object_locator_t oloc(mds->mdsmap->get_metadata_pool()); ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0, - new C_SafeCond(&lock, &cond, &ack, &r)); + new C_SafeCond(lock, cond, &ack, &r)); dout(15) << "launched non-blocking read tid=" << tid << " oid=" << oid << " oloc=" << oloc << dendl; /* timeout: if we waste half our time waiting for RADOS, then abort! */ - auto bal_interval = g_conf().get_val("mds_bal_interval"); - lock.Lock(); - int ret_t = cond.WaitInterval(lock, utime_t(bal_interval / 2, 0)); - lock.Unlock(); - + std::cv_status ret_t = [&] { + auto bal_interval = g_conf().get_val("mds_bal_interval"); + std::unique_lock locker{lock}; + return cond.wait_for(locker, std::chrono::seconds(bal_interval / 2)); + }(); /* success: store the balancer in memory and set the version. */ if (!r) { - if (ret_t == ETIMEDOUT) { + if (ret_t == std::cv_status::timeout) { mds->objecter->op_cancel(tid, -ECANCELED); return -ETIMEDOUT; } diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 07fddead765b8..887f777e338f1 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -254,7 +254,7 @@ void MDLog::append() void MDLog::_start_entry(LogEvent *e) { - ceph_assert(submit_mutex.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); ceph_assert(cur_event == NULL); cur_event = e; @@ -277,7 +277,7 @@ void MDLog::cancel_entry(LogEvent *le) void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c) { - ceph_assert(submit_mutex.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); ceph_assert(!mds->is_any_replay()); ceph_assert(!capped); @@ -354,17 +354,17 @@ void MDLog::_submit_thread() { dout(10) << "_submit_thread start" << dendl; - submit_mutex.Lock(); + std::unique_lock locker{submit_mutex}; while (!mds->is_daemon_stopping()) { if (g_conf()->mds_log_pause) { - submit_cond.Wait(submit_mutex); + submit_cond.wait(locker); continue; } map >::iterator it = pending_events.begin(); if (it == pending_events.end()) { - submit_cond.Wait(submit_mutex); + submit_cond.wait(locker); continue; } @@ -377,7 +377,7 @@ void MDLog::_submit_thread() PendingEvent data = it->second.front(); it->second.pop_front(); - submit_mutex.Unlock(); + locker.unlock(); if (data.le) { LogEvent *le = data.le; @@ -430,28 +430,26 @@ void MDLog::_submit_thread() journaler->flush(); } - submit_mutex.Lock(); + locker.lock(); if (data.flush) unflushed = 0; else if (data.le) unflushed++; } - - submit_mutex.Unlock(); } void MDLog::wait_for_safe(MDSContext *c) { - submit_mutex.Lock(); + submit_mutex.lock(); bool no_pending = true; if (!pending_events.empty()) { pending_events.rbegin()->second.push_back(PendingEvent(NULL, c)); no_pending = false; - submit_cond.Signal(); + submit_cond.notify_all(); } - submit_mutex.Unlock(); + submit_mutex.unlock(); if (no_pending && c) journaler->wait_for_flush(new C_IO_Wrapper(mds, c)); @@ -459,17 +457,17 @@ void MDLog::wait_for_safe(MDSContext *c) void MDLog::flush() { - submit_mutex.Lock(); + submit_mutex.lock(); bool do_flush = unflushed > 0; unflushed = 0; if (!pending_events.empty()) { pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true)); do_flush = false; - submit_cond.Signal(); + submit_cond.notify_all(); } - submit_mutex.Unlock(); + submit_mutex.unlock(); if (do_flush) journaler->flush(); @@ -478,7 +476,7 @@ void MDLog::flush() void MDLog::kick_submitter() { std::lock_guard l(submit_mutex); - submit_cond.Signal(); + submit_cond.notify_all(); } void MDLog::cap() @@ -489,7 +487,7 @@ void MDLog::cap() void MDLog::shutdown() { - ceph_assert(mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); dout(5) << "shutdown" << dendl; if (submit_thread.is_started()) { @@ -500,15 +498,15 @@ void MDLog::shutdown() // returning from suicide, and subsequently respect mds->is_daemon_stopping() // and fall out of its loop. } else { - mds->mds_lock.Unlock(); + mds->mds_lock.unlock(); // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else // picking it up will do anything with it. - submit_mutex.Lock(); - submit_cond.Signal(); - submit_mutex.Unlock(); + submit_mutex.lock(); + submit_cond.notify_all(); + submit_mutex.unlock(); - mds->mds_lock.Lock(); + mds->mds_lock.lock(); submit_thread.join(); } @@ -521,15 +519,15 @@ void MDLog::shutdown() } if (replay_thread.is_started() && !replay_thread.am_self()) { - mds->mds_lock.Unlock(); + mds->mds_lock.unlock(); replay_thread.join(); - mds->mds_lock.Lock(); + mds->mds_lock.lock(); } if (recovery_thread.is_started() && !recovery_thread.am_self()) { - mds->mds_lock.Unlock(); + mds->mds_lock.unlock(); recovery_thread.join(); - mds->mds_lock.Lock(); + mds->mds_lock.lock(); } } @@ -545,7 +543,7 @@ void MDLog::_start_new_segment() void MDLog::_prepare_new_segment() { - ceph_assert(submit_mutex.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); uint64_t seq = event_seq + 1; dout(7) << __func__ << " seq " << seq << dendl; @@ -563,7 +561,7 @@ void MDLog::_prepare_new_segment() void MDLog::_journal_segment_subtree_map(MDSContext *onsync) { - ceph_assert(submit_mutex.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); dout(7) << __func__ << dendl; ESubtreeMap *sle = mds->mdcache->create_subtree_map(); @@ -600,7 +598,7 @@ void MDLog::trim(int m) max_events = g_conf()->mds_log_events_per_segment + 1; } - submit_mutex.Lock(); + submit_mutex.lock(); // trim! dout(10) << "trim " @@ -611,7 +609,7 @@ void MDLog::trim(int m) << dendl; if (segments.empty()) { - submit_mutex.Unlock(); + submit_mutex.unlock(); return; } @@ -676,12 +674,12 @@ void MDLog::trim(int m) new_expiring_segments++; expiring_segments.insert(ls); expiring_events += ls->num_events; - submit_mutex.Unlock(); + submit_mutex.unlock(); uint64_t last_seq = ls->seq; try_expire(ls, op_prio); - submit_mutex.Lock(); + submit_mutex.lock(); p = segments.lower_bound(last_seq + 1); } } @@ -691,10 +689,10 @@ void MDLog::trim(int m) uint64_t last_seq = get_last_segment_seq(); if (mds->mdcache->open_file_table.is_any_dirty() || last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) { - submit_mutex.Unlock(); + submit_mutex.unlock(); mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq), last_seq, CEPH_MSG_PRIO_HIGH); - submit_mutex.Lock(); + submit_mutex.lock(); } } @@ -722,7 +720,7 @@ class C_MaybeExpiredSegment : public MDSInternalContext { */ int MDLog::trim_all() { - submit_mutex.Lock(); + submit_mutex.lock(); dout(10) << __func__ << ": " << segments.size() @@ -735,10 +733,10 @@ int MDLog::trim_all() if (!capped && !mds->mdcache->open_file_table.is_any_committing() && last_seq > mds->mdcache->open_file_table.get_committing_log_seq()) { - submit_mutex.Unlock(); + submit_mutex.unlock(); mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq), last_seq, CEPH_MSG_PRIO_DEFAULT); - submit_mutex.Lock(); + submit_mutex.lock(); } } @@ -752,7 +750,7 @@ int MDLog::trim_all() // Caller should have flushed journaler before calling this if (pending_events.count(ls->seq)) { dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl; - submit_mutex.Unlock(); + submit_mutex.unlock(); return -EAGAIN; } @@ -766,12 +764,12 @@ int MDLog::trim_all() ceph_assert(expiring_segments.count(ls) == 0); expiring_segments.insert(ls); expiring_events += ls->num_events; - submit_mutex.Unlock(); + submit_mutex.unlock(); uint64_t next_seq = ls->seq + 1; try_expire(ls, CEPH_MSG_PRIO_DEFAULT); - submit_mutex.Lock(); + submit_mutex.lock(); p = segments.lower_bound(next_seq); } } @@ -793,12 +791,12 @@ void MDLog::try_expire(LogSegment *ls, int op_prio) gather_bld.activate(); } else { dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl; - submit_mutex.Lock(); + submit_mutex.lock(); ceph_assert(expiring_segments.count(ls)); expiring_segments.erase(ls); expiring_events -= ls->num_events; _expired(ls); - submit_mutex.Unlock(); + submit_mutex.unlock(); } logger->set(l_mdl_segexg, expiring_segments.size()); @@ -819,7 +817,7 @@ void MDLog::_maybe_expired(LogSegment *ls, int op_prio) void MDLog::_trim_expired_segments() { - ceph_assert(submit_mutex.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq(); @@ -863,7 +861,7 @@ void MDLog::_trim_expired_segments() trimmed = true; } - submit_mutex.Unlock(); + submit_mutex.unlock(); if (trimmed) journaler->write_head(0); @@ -871,13 +869,13 @@ void MDLog::_trim_expired_segments() void MDLog::trim_expired_segments() { - submit_mutex.Lock(); + submit_mutex.lock(); _trim_expired_segments(); } void MDLog::_expired(LogSegment *ls) { - ceph_assert(submit_mutex.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); dout(5) << "_expired segment " << ls->seq << "/" << ls->offset << ", " << ls->num_events << " events" << dendl; diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index e503e57caf1f3..261d2419e3f25 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -138,8 +138,8 @@ protected: int64_t mdsmap_up_features; map > pending_events; // log segment -> event list - Mutex submit_mutex; - Cond submit_cond; + ceph::mutex submit_mutex = ceph::make_mutex("MDLog::submit_mutex"); + ceph::condition_variable submit_cond; void set_safe_pos(uint64_t pos) { @@ -206,7 +206,6 @@ public: recovery_thread(this), event_seq(0), expiring_events(0), expired_events(0), mdsmap_up_features(0), - submit_mutex("MDLog::submit_mutex"), submit_thread(this), cur_event(NULL) { } ~MDLog(); @@ -227,9 +226,10 @@ public: _prepare_new_segment(); } void journal_segment_subtree_map(MDSContext *onsync=NULL) { - submit_mutex.Lock(); - _journal_segment_subtree_map(onsync); - submit_mutex.Unlock(); + { + std::lock_guard l{submit_mutex}; + _journal_segment_subtree_map(onsync); + } if (onsync) flush(); } @@ -284,13 +284,13 @@ public: void submit_entry(LogEvent *e, MDSLogContextBase *c = 0) { std::lock_guard l(submit_mutex); _submit_entry(e, c); - submit_cond.Signal(); + submit_cond.notify_all(); } void start_submit_entry(LogEvent *e, MDSLogContextBase *c = 0) { std::lock_guard l(submit_mutex); _start_entry(e); _submit_entry(e, c); - submit_cond.Signal(); + submit_cond.notify_all(); } bool entry_is_open() const { return cur_event != NULL; } diff --git a/src/mds/MDSContext.cc b/src/mds/MDSContext.cc index f520737e454fc..bdf265965eea2 100644 --- a/src/mds/MDSContext.cc +++ b/src/mds/MDSContext.cc @@ -24,7 +24,7 @@ void MDSContext::complete(int r) { MDSRank *mds = get_mds(); ceph_assert(mds != nullptr); - ceph_assert(mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); dout(10) << "MDSContext::complete: " << typeid(*this).name() << dendl; return Context::complete(r); } diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 56038477fe680..0a80bf4db1412 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -63,7 +63,7 @@ // cons/des MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc) : Dispatcher(m->cct), - mds_lock("MDSDaemon::mds_lock"), + mds_lock(ceph::make_mutex("MDSDaemon::mds_lock")), stopping(false), timer(m->cct, mds_lock), gss_ktfile_client(m->cct->_conf.get_val("gss_ktab_client_file")), @@ -378,9 +378,9 @@ int MDSDaemon::init() r = monc->init(); if (r < 0) { derr << "ERROR: failed to init monc: " << cpp_strerror(-r) << dendl; - mds_lock.Lock(); + mds_lock.lock(); suicide(); - mds_lock.Unlock(); + mds_lock.unlock(); return r; } @@ -394,9 +394,9 @@ int MDSDaemon::init() r = monc->authenticate(); if (r < 0) { derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl; - mds_lock.Lock(); + mds_lock.lock(); suicide(); - mds_lock.Unlock(); + mds_lock.unlock(); return r; } @@ -410,19 +410,18 @@ int MDSDaemon::init() } derr << "ERROR: failed to refresh rotating keys, " << "maximum retry time reached." << dendl; - mds_lock.Lock(); + std::lock_guard locker{mds_lock}; suicide(); - mds_lock.Unlock(); return -ETIMEDOUT; } mgrc.init(); messenger->add_dispatcher_head(&mgrc); - mds_lock.Lock(); + mds_lock.lock(); if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) { dout(4) << __func__ << ": terminated already, dropping out" << dendl; - mds_lock.Unlock(); + mds_lock.unlock(); return 0; } @@ -430,16 +429,15 @@ int MDSDaemon::init() monc->sub_want("mgrmap", 0, 0); monc->renew_subs(); - mds_lock.Unlock(); + mds_lock.unlock(); // Set up admin socket before taking mds_lock, so that ordering // is consistent (later we take mds_lock within asok callbacks) set_up_admin_socket(); - mds_lock.Lock(); + std::lock_guard locker{mds_lock}; if (beacon.get_want_state() == MDSMap::STATE_DNE) { suicide(); // we could do something more graceful here dout(4) << __func__ << ": terminated already, dropping out" << dendl; - mds_lock.Unlock(); return 0; } @@ -450,8 +448,6 @@ int MDSDaemon::init() // schedule tick reset_tick(); - mds_lock.Unlock(); - return 0; } @@ -464,7 +460,7 @@ void MDSDaemon::reset_tick() tick_event = timer.add_event_after( g_conf()->mds_tick_interval, new FunctionContext([this](int) { - ceph_assert(mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds_lock)); tick(); })); } @@ -934,7 +930,7 @@ void MDSDaemon::handle_signal(int signum) void MDSDaemon::suicide() { - ceph_assert(mds_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(mds_lock)); // make sure we don't suicide twice ceph_assert(stopping == false); diff --git a/src/mds/MDSDaemon.h b/src/mds/MDSDaemon.h index 13b29d19d6b59..02794153ec0bc 100644 --- a/src/mds/MDSDaemon.h +++ b/src/mds/MDSDaemon.h @@ -24,7 +24,7 @@ #include "messages/MMonCommand.h" #include "common/LogClient.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Timer.h" #include "include/Context.h" #include "include/types.h" @@ -46,7 +46,7 @@ class MDSDaemon : public Dispatcher { * also check the `stopping` flag. If stopping is true, you * must either do nothing and immediately drop the lock, or * never drop the lock again (i.e. call respawn()) */ - Mutex mds_lock; + ceph::mutex mds_lock; bool stopping; SafeTimer timer; diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 6cd44270bd7fa..3d8996d380ec2 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -51,7 +51,7 @@ public: } void send() { - assert(mds->mds_lock.is_locked()); + assert(ceph_mutex_is_locked(mds->mds_lock)); dout(20) << __func__ << dendl; @@ -253,7 +253,7 @@ public: void send() { // not really a hard requirement here, but lets ensure this in // case we change the logic here. - assert(mds->mds_lock.is_locked()); + assert(ceph_mutex_is_locked(mds->mds_lock)); dout(20) << __func__ << dendl; f->open_object_section("result"); @@ -270,7 +270,6 @@ private: C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish) : MDSInternalContext(mds), timeout(timeout), - lock("mds::context::timeout", false, true), on_finish(on_finish) { } ~C_ContextTimeout() { @@ -308,7 +307,7 @@ private: } uint64_t timeout; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("mds::context::timeout"); Context *on_finish = nullptr; Context *timer_task = nullptr; }; @@ -473,7 +472,7 @@ private: MDSRank::MDSRank( mds_rank_t whoami_, - Mutex &mds_lock_, + ceph::mutex &mds_lock_, LogChannelRef &clog_, SafeTimer &timer_, Beacon &beacon_, @@ -831,9 +830,9 @@ void MDSRankDispatcher::shutdown() purge_queue.shutdown(); - mds_lock.Unlock(); + mds_lock.unlock(); finisher->stop(); // no flushing - mds_lock.Lock(); + mds_lock.lock(); if (objecter->initialized) objecter->shutdown(); @@ -846,12 +845,12 @@ void MDSRankDispatcher::shutdown() // release mds_lock for finisher/messenger threads (e.g. // MDSDaemon::ms_handle_reset called from Messenger). - mds_lock.Unlock(); + mds_lock.unlock(); // shut down messenger messenger->shutdown(); - mds_lock.Lock(); + mds_lock.lock(); // Workaround unclean shutdown: HeartbeatMap will assert if // worker is not removed (as we do in ~MDS), but ~MDS is not @@ -926,7 +925,7 @@ void MDSRank::respawn() void MDSRank::damaged() { ceph_assert(whoami != MDS_RANK_NONE); - ceph_assert(mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds_lock)); beacon.set_want_state(*mdsmap, MDSMap::STATE_DAMAGED); monc->flush_log(); // Flush any clog error from before we were called @@ -968,13 +967,13 @@ void MDSRank::handle_write_error(int err) void *MDSRank::ProgressThread::entry() { - std::lock_guard l(mds->mds_lock); + std::unique_lock l(mds->mds_lock); while (true) { - while (!mds->stopping && - mds->finished_queue.empty() && - (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) { - cond.Wait(mds->mds_lock); - } + cond.wait(l, [this] { + return (mds->stopping || + !mds->finished_queue.empty() || + (!mds->waiting_for_nolaggy.empty() && !mds->beacon.is_laggy())); + }); if (mds->stopping) { break; @@ -989,18 +988,18 @@ void *MDSRank::ProgressThread::entry() void MDSRank::ProgressThread::shutdown() { - ceph_assert(mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); ceph_assert(mds->stopping); if (am_self()) { // Stopping is set, we will fall out of our main loop naturally } else { // Kick the thread to notice mds->stopping, and join it - cond.Signal(); - mds->mds_lock.Unlock(); + cond.notify_all(); + mds->mds_lock.unlock(); if (is_started()) join(); - mds->mds_lock.Lock(); + mds->mds_lock.lock(); } } @@ -1228,7 +1227,7 @@ bool MDSRank::handle_deferrable_message(const cref_t &m) */ void MDSRank::_advance_queues() { - ceph_assert(mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds_lock)); if (!finished_queue.empty()) { dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl; @@ -1617,7 +1616,7 @@ void MDSRank::boot_start(BootStep step, int r) void MDSRank::validate_sessions() { - ceph_assert(mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds_lock)); bool valid = true; // Identify any sessions which have state inconsistent with other, @@ -2458,11 +2457,10 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command, ss << "no target epoch given"; return true; } - - mds_lock.Lock(); - set_osd_epoch_barrier(target_epoch); - mds_lock.Unlock(); - + { + std::lock_guard l(mds_lock); + set_osd_epoch_barrier(target_epoch); + } C_SaferCond cond; bool already_got = objecter->wait_for_map(target_epoch, &cond); if (!already_got) { @@ -2482,8 +2480,7 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command, ss << "Invalid client_id specified"; return true; } - - mds_lock.Lock(); + std::lock_guard l(mds_lock); std::stringstream dss; bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true, g_conf()->mds_session_blacklist_on_evict, dss); @@ -2491,7 +2488,6 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command, dout(15) << dss.str() << dendl; ss << dss.str(); } - mds_lock.Unlock(); } else if (command == "session config") { int64_t client_id; std::string option; @@ -2501,9 +2497,8 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command, cmd_getval(g_ceph_context, cmdmap, "option", option); bool got_value = cmd_getval(g_ceph_context, cmdmap, "value", value); - mds_lock.Lock(); + std::lock_guard l(mds_lock); config_client(client_id, !got_value, option, value, ss); - mds_lock.Unlock(); } else if (command == "scrub_path") { string path; vector scrubop_vec; @@ -3369,7 +3364,7 @@ bool MDSRank::evict_client(int64_t session_id, bool wait, bool blacklist, std::ostream& err_ss, Context *on_killed) { - ceph_assert(mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds_lock)); // Mutually exclusive args ceph_assert(!(wait && on_killed != nullptr)); @@ -3405,7 +3400,7 @@ bool MDSRank::evict_client(int64_t session_id, std::vector cmd = {tmp}; auto kill_client_session = [this, session_id, wait, on_killed](){ - ceph_assert(mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds_lock)); Session *session = sessionmap.get_session( entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id)); if (session) { @@ -3415,9 +3410,9 @@ bool MDSRank::evict_client(int64_t session_id, C_SaferCond on_safe; server->kill_session(session, &on_safe); - mds_lock.Unlock(); + mds_lock.unlock(); on_safe.wait(); - mds_lock.Lock(); + mds_lock.lock(); } } else { dout(1) << "session " << session_id << " was removed while we waited " @@ -3432,7 +3427,7 @@ bool MDSRank::evict_client(int64_t session_id, }; auto apply_blacklist = [this, cmd](std::function fn){ - ceph_assert(mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds_lock)); Context *on_blacklist_done = new FunctionContext([this, fn](int r) { objecter->wait_for_latest_osdmap( @@ -3458,9 +3453,9 @@ bool MDSRank::evict_client(int64_t session_id, if (blacklist) { C_SaferCond inline_ctx; apply_blacklist([&inline_ctx](){inline_ctx.complete(0);}); - mds_lock.Unlock(); + mds_lock.unlock(); inline_ctx.wait(); - mds_lock.Lock(); + mds_lock.lock(); } // We dropped mds_lock, so check that session still exists @@ -3505,7 +3500,7 @@ Context *MDSRank::create_async_exec_context(C_ExecAndReply *ctx) { MDSRankDispatcher::MDSRankDispatcher( mds_rank_t whoami_, - Mutex &mds_lock_, + ceph::mutex &mds_lock_, LogChannelRef &clog_, SafeTimer &timer_, Beacon &beacon_, diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index 8dc422966cb04..50d25fc6a20cc 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -149,7 +149,7 @@ class MDSRank { // Reference to global MDS::mds_lock, so that users of MDSRank don't // carry around references to the outer MDS, and we can substitute // a separate lock here in future potentially. - Mutex &mds_lock; + ceph::mutex &mds_lock; mono_time get_starttime() const { return starttime; @@ -243,12 +243,12 @@ class MDSRank { class ProgressThread : public Thread { MDSRank *mds; - Cond cond; + ceph::condition_variable cond; public: explicit ProgressThread(MDSRank *mds_) : mds(mds_) {} void * entry() override; void shutdown(); - void signal() {cond.Signal();} + void signal() {cond.notify_all();} } progress_thread; list> waiting_for_nolaggy; @@ -325,7 +325,7 @@ class MDSRank { MDSRank( mds_rank_t whoami_, - Mutex &mds_lock_, + ceph::mutex &mds_lock_, LogChannelRef &clog_, SafeTimer &timer_, Beacon &beacon_, @@ -639,7 +639,7 @@ public: MDSRankDispatcher( mds_rank_t whoami_, - Mutex &mds_lock_, + ceph::mutex &mds_lock_, LogChannelRef &clog_, SafeTimer &timer_, Beacon &beacon_, diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc index 372666b1aa3f3..a7d5c7c198100 100644 --- a/src/mds/PurgeQueue.cc +++ b/src/mds/PurgeQueue.cc @@ -81,7 +81,6 @@ PurgeQueue::PurgeQueue( : cct(cct_), rank(rank_), - lock("PurgeQueue"), metadata_pool(metadata_pool_), finisher(cct, "PurgeQueue", "PQ_Finisher"), timer(cct, lock), @@ -231,7 +230,7 @@ void PurgeQueue::wait_for_recovery(Context* c) void PurgeQueue::_recover() { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); // Journaler::is_readable() adjusts write_pos if partial entry is encountered while (1) { @@ -412,7 +411,7 @@ void PurgeQueue::_go_readonly(int r) bool PurgeQueue::_consume() { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); bool could_consume = false; while(_can_consume()) { @@ -478,7 +477,7 @@ void PurgeQueue::_execute_item( const PurgeItem &item, uint64_t expire_to) { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); in_flight[expire_to] = item; logger->set(l_pq_executing, in_flight.size()); @@ -582,7 +581,7 @@ void PurgeQueue::_execute_item( void PurgeQueue::_execute_item_complete( uint64_t expire_to) { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl; ceph_assert(in_flight.count(expire_to) == 1); diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h index c1d2de46eef3f..7f42ac05be92f 100644 --- a/src/mds/PurgeQueue.h +++ b/src/mds/PurgeQueue.h @@ -106,7 +106,7 @@ class PurgeQueue protected: CephContext *cct; const mds_rank_t rank; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("PurgeQueue"); bool readonly = false; int64_t metadata_pool; diff --git a/src/mds/ScrubStack.cc b/src/mds/ScrubStack.cc index 32205afc219f3..6e731b6591d71 100644 --- a/src/mds/ScrubStack.cc +++ b/src/mds/ScrubStack.cc @@ -85,7 +85,7 @@ void ScrubStack::_enqueue_inode(CInode *in, CDentry *parent, { dout(10) << __func__ << " with {" << *in << "}" << ", on_finish=" << on_finish << ", top=" << top << dendl; - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); in->scrub_initialize(parent, header, on_finish); if (top) push_inode(in); @@ -108,7 +108,7 @@ void ScrubStack::enqueue_inode(CInode *in, ScrubHeaderRef& header, void ScrubStack::kick_off_scrubs() { - ceph_assert(mdcache->mds->mds_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock)); dout(20) << __func__ << ": state=" << state << dendl; if (clear_inode_stack || state == STATE_PAUSING || state == STATE_PAUSED) { @@ -499,7 +499,7 @@ ScrubStack::C_KickOffScrubs::C_KickOffScrubs(MDCache *mdcache, ScrubStack *s) : MDSInternalContext(mdcache->mds), stack(s) { } void ScrubStack::complete_control_contexts(int r) { - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); for (auto &ctx : control_ctxs) { ctx->complete(r); @@ -516,7 +516,7 @@ void ScrubStack::set_state(State next_state) { } bool ScrubStack::scrub_in_transition_state() { - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); dout(20) << __func__ << ": state=" << state << dendl; // STATE_RUNNING is considered as a transition state so as to @@ -529,7 +529,7 @@ bool ScrubStack::scrub_in_transition_state() { } void ScrubStack::scrub_status(Formatter *f) { - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); f->open_object_section("result"); @@ -600,7 +600,7 @@ void ScrubStack::scrub_status(Formatter *f) { } void ScrubStack::abort_pending_scrubs() { - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); ceph_assert(clear_inode_stack); for (auto inode = inode_stack.begin(); !inode.end(); ++inode) { @@ -622,7 +622,7 @@ void ScrubStack::abort_pending_scrubs() { } void ScrubStack::scrub_abort(Context *on_finish) { - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); ceph_assert(on_finish != nullptr); dout(10) << __func__ << ": aborting with " << scrubs_in_progress @@ -643,7 +643,7 @@ void ScrubStack::scrub_abort(Context *on_finish) { } void ScrubStack::scrub_pause(Context *on_finish) { - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); ceph_assert(on_finish != nullptr); dout(10) << __func__ << ": pausing with " << scrubs_in_progress @@ -668,7 +668,7 @@ void ScrubStack::scrub_pause(Context *on_finish) { } bool ScrubStack::scrub_resume() { - ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock)); dout(20) << __func__ << ": state=" << state << dendl; int r = 0; diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 92ad6e8a5c9de..ebaa1fa6aa017 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -381,7 +381,7 @@ void Server::finish_reclaim_session(Session *session, const ref_tget_client().v; send_reply = new FunctionContext([this, session_id, reply](int r) { - assert(mds->mds_lock.is_locked_by_me()); + assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(session_id)); if (!session) { return; @@ -1098,7 +1098,7 @@ void Server::handle_conf_change(const std::set& changed) { */ void Server::kill_session(Session *session, Context *on_safe) { - ceph_assert(mds->mds_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); if ((session->is_opening() || session->is_open() || diff --git a/src/mgr/DaemonState.cc b/src/mgr/DaemonState.cc index 10e526f8d562d..3e3a30aeeb8d0 100644 --- a/src/mgr/DaemonState.cc +++ b/src/mgr/DaemonState.cc @@ -134,7 +134,7 @@ void DeviceState::print(ostream& out) const void DaemonStateIndex::insert(DaemonStatePtr dm) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; _insert(dm); } @@ -184,7 +184,7 @@ void DaemonStateIndex::_erase(const DaemonKey& dmk) DaemonStateCollection DaemonStateIndex::get_by_service( const std::string& svc) const { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; DaemonStateCollection result; @@ -200,7 +200,7 @@ DaemonStateCollection DaemonStateIndex::get_by_service( DaemonStateCollection DaemonStateIndex::get_by_server( const std::string &hostname) const { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; if (by_server.count(hostname)) { return by_server.at(hostname); @@ -211,14 +211,14 @@ DaemonStateCollection DaemonStateIndex::get_by_server( bool DaemonStateIndex::exists(const DaemonKey &key) const { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; return all.count(key) > 0; } DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key) { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; auto iter = all.find(key); if (iter != all.end()) { @@ -230,7 +230,7 @@ DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key) void DaemonStateIndex::rm(const DaemonKey &key) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; _rm(key); } @@ -246,7 +246,7 @@ void DaemonStateIndex::cull(const std::string& svc_name, { std::vector victims; - RWLock::WLocker l(lock); + std::unique_lock l{lock}; auto begin = all.lower_bound({svc_name, ""}); auto end = all.end(); for (auto &i = begin; i != end; ++i) { diff --git a/src/mgr/DaemonState.h b/src/mgr/DaemonState.h index ce29e9cd9030a..17ede40aa1513 100644 --- a/src/mgr/DaemonState.h +++ b/src/mgr/DaemonState.h @@ -126,7 +126,7 @@ class DaemonPerfCounters class DaemonState { public: - Mutex lock = {"DaemonState::lock"}; + ceph::mutex lock = ceph::make_mutex("DaemonState::lock"); DaemonKey key; @@ -240,7 +240,8 @@ typedef boost::intrusive_ptr DeviceStateRef; class DaemonStateIndex { private: - mutable RWLock lock = {"DaemonStateIndex", true, true, true}; + mutable ceph::shared_mutex lock = + ceph::make_shared_mutex("DaemonStateIndex", true, true, true); std::map by_server; DaemonStateCollection all; @@ -286,7 +287,7 @@ public: template auto with_daemons_by_server(Callback&& cb, Args&&... args) const -> decltype(cb(by_server, std::forward(args)...)) { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; return std::forward(cb)(by_server, std::forward(args)...); } @@ -294,7 +295,7 @@ public: template bool with_device(const std::string& dev, Callback&& cb, Args&&... args) const { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; auto p = devices.find(dev); if (p == devices.end()) { return false; @@ -306,7 +307,7 @@ public: template bool with_device_write(const std::string& dev, Callback&& cb, Args&&... args) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; auto p = devices.find(dev); if (p == devices.end()) { return false; @@ -321,14 +322,14 @@ public: template void with_device_create(const std::string& dev, Callback&& cb, Args&&... args) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; auto d = _get_or_create_device(dev); std::forward(cb)(*d, std::forward(args)...); } template void with_devices(Callback&& cb, Args&&... args) const { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; for (auto& i : devices) { std::forward(cb)(*i.second, std::forward(args)...); } @@ -338,7 +339,7 @@ public: void with_devices2(CallbackInitial&& cbi, // with lock taken Callback&& cb, // for each device Args&&... args) const { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; cbi(); for (auto& i : devices) { std::forward(cb)(*i.second, std::forward(args)...); @@ -357,25 +358,25 @@ public: } void notify_updating(const DaemonKey &k) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; updating.insert(k); } void clear_updating(const DaemonKey &k) { - RWLock::WLocker l(lock); + std::unique_lock l{lock}; updating.erase(k); } bool is_updating(const DaemonKey &k) { - RWLock::RLocker l(lock); + std::shared_lock l{lock}; return updating.count(k) > 0; } void update_metadata(DaemonStatePtr state, const map& meta) { // remove and re-insert in case the device metadata changed - RWLock::WLocker l(lock); + std::unique_lock l{lock}; _rm(state->key); { - Mutex::Locker l2(state->lock); + std::lock_guard l2{state->lock}; state->set_metadata(meta); } _insert(state); -- 2.39.5