shutdown:
// yuck: grab the mds lock, so we can be sure that whoever in *mds
// called shutdown finishes what they were doing.
- mds->mds_lock.Lock();
- mds->mds_lock.Unlock();
+ mds->mds_lock.lock();
+ mds->mds_lock.unlock();
pidfile_remove();
}
// I'm going to touch this MDS, so it must be locked
- ceph_assert(mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
health.metrics.clear();
#include "mdstypes.h"
class Objecter;
-class Mutex;
// This always lives in the same location for a given MDS
// instance, it tells the daemon where to look for the journal.
C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
// We are used as an MDSCacheObject waiter, so should
// only be invoked by someone already holding the big lock.
- ceph_assert(locker->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock));
p->get(MDSCacheObject::PIN_PTRWAITER);
}
void finish(int r) override {
bool ack = false;
int r = 0;
bufferlist lua_src;
- Mutex lock("lock");
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("lock");
+ ceph::condition_variable cond;
/* we assume that balancer is in the metadata pool */
object_t oid = object_t(mds->mdsmap->get_balancer());
object_locator_t oloc(mds->mdsmap->get_metadata_pool());
ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0,
- new C_SafeCond(&lock, &cond, &ack, &r));
+ new C_SafeCond(lock, cond, &ack, &r));
dout(15) << "launched non-blocking read tid=" << tid
<< " oid=" << oid << " oloc=" << oloc << dendl;
/* timeout: if we waste half our time waiting for RADOS, then abort! */
- auto bal_interval = g_conf().get_val<int64_t>("mds_bal_interval");
- lock.Lock();
- int ret_t = cond.WaitInterval(lock, utime_t(bal_interval / 2, 0));
- lock.Unlock();
-
+ std::cv_status ret_t = [&] {
+ auto bal_interval = g_conf().get_val<int64_t>("mds_bal_interval");
+ std::unique_lock locker{lock};
+ return cond.wait_for(locker, std::chrono::seconds(bal_interval / 2));
+ }();
/* success: store the balancer in memory and set the version. */
if (!r) {
- if (ret_t == ETIMEDOUT) {
+ if (ret_t == std::cv_status::timeout) {
mds->objecter->op_cancel(tid, -ECANCELED);
return -ETIMEDOUT;
}
void MDLog::_start_entry(LogEvent *e)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
ceph_assert(cur_event == NULL);
cur_event = e;
void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
ceph_assert(!mds->is_any_replay());
ceph_assert(!capped);
{
dout(10) << "_submit_thread start" << dendl;
- submit_mutex.Lock();
+ std::unique_lock locker{submit_mutex};
while (!mds->is_daemon_stopping()) {
if (g_conf()->mds_log_pause) {
- submit_cond.Wait(submit_mutex);
+ submit_cond.wait(locker);
continue;
}
map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
if (it == pending_events.end()) {
- submit_cond.Wait(submit_mutex);
+ submit_cond.wait(locker);
continue;
}
PendingEvent data = it->second.front();
it->second.pop_front();
- submit_mutex.Unlock();
+ locker.unlock();
if (data.le) {
LogEvent *le = data.le;
journaler->flush();
}
- submit_mutex.Lock();
+ locker.lock();
if (data.flush)
unflushed = 0;
else if (data.le)
unflushed++;
}
-
- submit_mutex.Unlock();
}
void MDLog::wait_for_safe(MDSContext *c)
{
- submit_mutex.Lock();
+ submit_mutex.lock();
bool no_pending = true;
if (!pending_events.empty()) {
pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
no_pending = false;
- submit_cond.Signal();
+ submit_cond.notify_all();
}
- submit_mutex.Unlock();
+ submit_mutex.unlock();
if (no_pending && c)
journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
void MDLog::flush()
{
- submit_mutex.Lock();
+ submit_mutex.lock();
bool do_flush = unflushed > 0;
unflushed = 0;
if (!pending_events.empty()) {
pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
do_flush = false;
- submit_cond.Signal();
+ submit_cond.notify_all();
}
- submit_mutex.Unlock();
+ submit_mutex.unlock();
if (do_flush)
journaler->flush();
void MDLog::kick_submitter()
{
std::lock_guard l(submit_mutex);
- submit_cond.Signal();
+ submit_cond.notify_all();
}
void MDLog::cap()
void MDLog::shutdown()
{
- ceph_assert(mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
dout(5) << "shutdown" << dendl;
if (submit_thread.is_started()) {
// returning from suicide, and subsequently respect mds->is_daemon_stopping()
// and fall out of its loop.
} else {
- mds->mds_lock.Unlock();
+ mds->mds_lock.unlock();
// Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
// picking it up will do anything with it.
- submit_mutex.Lock();
- submit_cond.Signal();
- submit_mutex.Unlock();
+ submit_mutex.lock();
+ submit_cond.notify_all();
+ submit_mutex.unlock();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
submit_thread.join();
}
}
if (replay_thread.is_started() && !replay_thread.am_self()) {
- mds->mds_lock.Unlock();
+ mds->mds_lock.unlock();
replay_thread.join();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
}
if (recovery_thread.is_started() && !recovery_thread.am_self()) {
- mds->mds_lock.Unlock();
+ mds->mds_lock.unlock();
recovery_thread.join();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
}
}
void MDLog::_prepare_new_segment()
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
uint64_t seq = event_seq + 1;
dout(7) << __func__ << " seq " << seq << dendl;
void MDLog::_journal_segment_subtree_map(MDSContext *onsync)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
dout(7) << __func__ << dendl;
ESubtreeMap *sle = mds->mdcache->create_subtree_map();
max_events = g_conf()->mds_log_events_per_segment + 1;
}
- submit_mutex.Lock();
+ submit_mutex.lock();
// trim!
dout(10) << "trim "
<< dendl;
if (segments.empty()) {
- submit_mutex.Unlock();
+ submit_mutex.unlock();
return;
}
new_expiring_segments++;
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.Unlock();
+ submit_mutex.unlock();
uint64_t last_seq = ls->seq;
try_expire(ls, op_prio);
- submit_mutex.Lock();
+ submit_mutex.lock();
p = segments.lower_bound(last_seq + 1);
}
}
uint64_t last_seq = get_last_segment_seq();
if (mds->mdcache->open_file_table.is_any_dirty() ||
last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) {
- submit_mutex.Unlock();
+ submit_mutex.unlock();
mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
last_seq, CEPH_MSG_PRIO_HIGH);
- submit_mutex.Lock();
+ submit_mutex.lock();
}
}
*/
int MDLog::trim_all()
{
- submit_mutex.Lock();
+ submit_mutex.lock();
dout(10) << __func__ << ": "
<< segments.size()
if (!capped &&
!mds->mdcache->open_file_table.is_any_committing() &&
last_seq > mds->mdcache->open_file_table.get_committing_log_seq()) {
- submit_mutex.Unlock();
+ submit_mutex.unlock();
mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
last_seq, CEPH_MSG_PRIO_DEFAULT);
- submit_mutex.Lock();
+ submit_mutex.lock();
}
}
// Caller should have flushed journaler before calling this
if (pending_events.count(ls->seq)) {
dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
- submit_mutex.Unlock();
+ submit_mutex.unlock();
return -EAGAIN;
}
ceph_assert(expiring_segments.count(ls) == 0);
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.Unlock();
+ submit_mutex.unlock();
uint64_t next_seq = ls->seq + 1;
try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
- submit_mutex.Lock();
+ submit_mutex.lock();
p = segments.lower_bound(next_seq);
}
}
gather_bld.activate();
} else {
dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
- submit_mutex.Lock();
+ submit_mutex.lock();
ceph_assert(expiring_segments.count(ls));
expiring_segments.erase(ls);
expiring_events -= ls->num_events;
_expired(ls);
- submit_mutex.Unlock();
+ submit_mutex.unlock();
}
logger->set(l_mdl_segexg, expiring_segments.size());
void MDLog::_trim_expired_segments()
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
trimmed = true;
}
- submit_mutex.Unlock();
+ submit_mutex.unlock();
if (trimmed)
journaler->write_head(0);
void MDLog::trim_expired_segments()
{
- submit_mutex.Lock();
+ submit_mutex.lock();
_trim_expired_segments();
}
void MDLog::_expired(LogSegment *ls)
{
- ceph_assert(submit_mutex.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
<< ", " << ls->num_events << " events" << dendl;
int64_t mdsmap_up_features;
map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
- Mutex submit_mutex;
- Cond submit_cond;
+ ceph::mutex submit_mutex = ceph::make_mutex("MDLog::submit_mutex");
+ ceph::condition_variable submit_cond;
void set_safe_pos(uint64_t pos)
{
recovery_thread(this),
event_seq(0), expiring_events(0), expired_events(0),
mdsmap_up_features(0),
- submit_mutex("MDLog::submit_mutex"),
submit_thread(this),
cur_event(NULL) { }
~MDLog();
_prepare_new_segment();
}
void journal_segment_subtree_map(MDSContext *onsync=NULL) {
- submit_mutex.Lock();
- _journal_segment_subtree_map(onsync);
- submit_mutex.Unlock();
+ {
+ std::lock_guard l{submit_mutex};
+ _journal_segment_subtree_map(onsync);
+ }
if (onsync)
flush();
}
void submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
std::lock_guard l(submit_mutex);
_submit_entry(e, c);
- submit_cond.Signal();
+ submit_cond.notify_all();
}
void start_submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
std::lock_guard l(submit_mutex);
_start_entry(e);
_submit_entry(e, c);
- submit_cond.Signal();
+ submit_cond.notify_all();
}
bool entry_is_open() const { return cur_event != NULL; }
void MDSContext::complete(int r) {
MDSRank *mds = get_mds();
ceph_assert(mds != nullptr);
- ceph_assert(mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
dout(10) << "MDSContext::complete: " << typeid(*this).name() << dendl;
return Context::complete(r);
}
// cons/des
MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
- mds_lock("MDSDaemon::mds_lock"),
+ mds_lock(ceph::make_mutex("MDSDaemon::mds_lock")),
stopping(false),
timer(m->cct, mds_lock),
gss_ktfile_client(m->cct->_conf.get_val<std::string>("gss_ktab_client_file")),
r = monc->init();
if (r < 0) {
derr << "ERROR: failed to init monc: " << cpp_strerror(-r) << dendl;
- mds_lock.Lock();
+ mds_lock.lock();
suicide();
- mds_lock.Unlock();
+ mds_lock.unlock();
return r;
}
r = monc->authenticate();
if (r < 0) {
derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl;
- mds_lock.Lock();
+ mds_lock.lock();
suicide();
- mds_lock.Unlock();
+ mds_lock.unlock();
return r;
}
}
derr << "ERROR: failed to refresh rotating keys, "
<< "maximum retry time reached." << dendl;
- mds_lock.Lock();
+ std::lock_guard locker{mds_lock};
suicide();
- mds_lock.Unlock();
return -ETIMEDOUT;
}
mgrc.init();
messenger->add_dispatcher_head(&mgrc);
- mds_lock.Lock();
+ mds_lock.lock();
if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
dout(4) << __func__ << ": terminated already, dropping out" << dendl;
- mds_lock.Unlock();
+ mds_lock.unlock();
return 0;
}
monc->sub_want("mgrmap", 0, 0);
monc->renew_subs();
- mds_lock.Unlock();
+ mds_lock.unlock();
// Set up admin socket before taking mds_lock, so that ordering
// is consistent (later we take mds_lock within asok callbacks)
set_up_admin_socket();
- mds_lock.Lock();
+ std::lock_guard locker{mds_lock};
if (beacon.get_want_state() == MDSMap::STATE_DNE) {
suicide(); // we could do something more graceful here
dout(4) << __func__ << ": terminated already, dropping out" << dendl;
- mds_lock.Unlock();
return 0;
}
// schedule tick
reset_tick();
- mds_lock.Unlock();
-
return 0;
}
tick_event = timer.add_event_after(
g_conf()->mds_tick_interval,
new FunctionContext([this](int) {
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
tick();
}));
}
void MDSDaemon::suicide()
{
- ceph_assert(mds_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(mds_lock));
// make sure we don't suicide twice
ceph_assert(stopping == false);
#include "messages/MMonCommand.h"
#include "common/LogClient.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Timer.h"
#include "include/Context.h"
#include "include/types.h"
* also check the `stopping` flag. If stopping is true, you
* must either do nothing and immediately drop the lock, or
* never drop the lock again (i.e. call respawn()) */
- Mutex mds_lock;
+ ceph::mutex mds_lock;
bool stopping;
SafeTimer timer;
}
void send() {
- assert(mds->mds_lock.is_locked());
+ assert(ceph_mutex_is_locked(mds->mds_lock));
dout(20) << __func__ << dendl;
void send() {
// not really a hard requirement here, but lets ensure this in
// case we change the logic here.
- assert(mds->mds_lock.is_locked());
+ assert(ceph_mutex_is_locked(mds->mds_lock));
dout(20) << __func__ << dendl;
f->open_object_section("result");
C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
: MDSInternalContext(mds),
timeout(timeout),
- lock("mds::context::timeout", false, true),
on_finish(on_finish) {
}
~C_ContextTimeout() {
}
uint64_t timeout;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("mds::context::timeout");
Context *on_finish = nullptr;
Context *timer_task = nullptr;
};
MDSRank::MDSRank(
mds_rank_t whoami_,
- Mutex &mds_lock_,
+ ceph::mutex &mds_lock_,
LogChannelRef &clog_,
SafeTimer &timer_,
Beacon &beacon_,
purge_queue.shutdown();
- mds_lock.Unlock();
+ mds_lock.unlock();
finisher->stop(); // no flushing
- mds_lock.Lock();
+ mds_lock.lock();
if (objecter->initialized)
objecter->shutdown();
// release mds_lock for finisher/messenger threads (e.g.
// MDSDaemon::ms_handle_reset called from Messenger).
- mds_lock.Unlock();
+ mds_lock.unlock();
// shut down messenger
messenger->shutdown();
- mds_lock.Lock();
+ mds_lock.lock();
// Workaround unclean shutdown: HeartbeatMap will assert if
// worker is not removed (as we do in ~MDS), but ~MDS is not
void MDSRank::damaged()
{
ceph_assert(whoami != MDS_RANK_NONE);
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
beacon.set_want_state(*mdsmap, MDSMap::STATE_DAMAGED);
monc->flush_log(); // Flush any clog error from before we were called
void *MDSRank::ProgressThread::entry()
{
- std::lock_guard l(mds->mds_lock);
+ std::unique_lock l(mds->mds_lock);
while (true) {
- while (!mds->stopping &&
- mds->finished_queue.empty() &&
- (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
- cond.Wait(mds->mds_lock);
- }
+ cond.wait(l, [this] {
+ return (mds->stopping ||
+ !mds->finished_queue.empty() ||
+ (!mds->waiting_for_nolaggy.empty() && !mds->beacon.is_laggy()));
+ });
if (mds->stopping) {
break;
void MDSRank::ProgressThread::shutdown()
{
- ceph_assert(mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
ceph_assert(mds->stopping);
if (am_self()) {
// Stopping is set, we will fall out of our main loop naturally
} else {
// Kick the thread to notice mds->stopping, and join it
- cond.Signal();
- mds->mds_lock.Unlock();
+ cond.notify_all();
+ mds->mds_lock.unlock();
if (is_started())
join();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
}
}
*/
void MDSRank::_advance_queues()
{
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
if (!finished_queue.empty()) {
dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
void MDSRank::validate_sessions()
{
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
bool valid = true;
// Identify any sessions which have state inconsistent with other,
ss << "no target epoch given";
return true;
}
-
- mds_lock.Lock();
- set_osd_epoch_barrier(target_epoch);
- mds_lock.Unlock();
-
+ {
+ std::lock_guard l(mds_lock);
+ set_osd_epoch_barrier(target_epoch);
+ }
C_SaferCond cond;
bool already_got = objecter->wait_for_map(target_epoch, &cond);
if (!already_got) {
ss << "Invalid client_id specified";
return true;
}
-
- mds_lock.Lock();
+ std::lock_guard l(mds_lock);
std::stringstream dss;
bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
g_conf()->mds_session_blacklist_on_evict, dss);
dout(15) << dss.str() << dendl;
ss << dss.str();
}
- mds_lock.Unlock();
} else if (command == "session config") {
int64_t client_id;
std::string option;
cmd_getval(g_ceph_context, cmdmap, "option", option);
bool got_value = cmd_getval(g_ceph_context, cmdmap, "value", value);
- mds_lock.Lock();
+ std::lock_guard l(mds_lock);
config_client(client_id, !got_value, option, value, ss);
- mds_lock.Unlock();
} else if (command == "scrub_path") {
string path;
vector<string> scrubop_vec;
bool wait, bool blacklist, std::ostream& err_ss,
Context *on_killed)
{
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
// Mutually exclusive args
ceph_assert(!(wait && on_killed != nullptr));
std::vector<std::string> cmd = {tmp};
auto kill_client_session = [this, session_id, wait, on_killed](){
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
Session *session = sessionmap.get_session(
entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
if (session) {
C_SaferCond on_safe;
server->kill_session(session, &on_safe);
- mds_lock.Unlock();
+ mds_lock.unlock();
on_safe.wait();
- mds_lock.Lock();
+ mds_lock.lock();
}
} else {
dout(1) << "session " << session_id << " was removed while we waited "
};
auto apply_blacklist = [this, cmd](std::function<void ()> fn){
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
Context *on_blacklist_done = new FunctionContext([this, fn](int r) {
objecter->wait_for_latest_osdmap(
if (blacklist) {
C_SaferCond inline_ctx;
apply_blacklist([&inline_ctx](){inline_ctx.complete(0);});
- mds_lock.Unlock();
+ mds_lock.unlock();
inline_ctx.wait();
- mds_lock.Lock();
+ mds_lock.lock();
}
// We dropped mds_lock, so check that session still exists
MDSRankDispatcher::MDSRankDispatcher(
mds_rank_t whoami_,
- Mutex &mds_lock_,
+ ceph::mutex &mds_lock_,
LogChannelRef &clog_,
SafeTimer &timer_,
Beacon &beacon_,
// Reference to global MDS::mds_lock, so that users of MDSRank don't
// carry around references to the outer MDS, and we can substitute
// a separate lock here in future potentially.
- Mutex &mds_lock;
+ ceph::mutex &mds_lock;
mono_time get_starttime() const {
return starttime;
class ProgressThread : public Thread {
MDSRank *mds;
- Cond cond;
+ ceph::condition_variable cond;
public:
explicit ProgressThread(MDSRank *mds_) : mds(mds_) {}
void * entry() override;
void shutdown();
- void signal() {cond.Signal();}
+ void signal() {cond.notify_all();}
} progress_thread;
list<cref_t<Message>> waiting_for_nolaggy;
MDSRank(
mds_rank_t whoami_,
- Mutex &mds_lock_,
+ ceph::mutex &mds_lock_,
LogChannelRef &clog_,
SafeTimer &timer_,
Beacon &beacon_,
MDSRankDispatcher(
mds_rank_t whoami_,
- Mutex &mds_lock_,
+ ceph::mutex &mds_lock_,
LogChannelRef &clog_,
SafeTimer &timer_,
Beacon &beacon_,
:
cct(cct_),
rank(rank_),
- lock("PurgeQueue"),
metadata_pool(metadata_pool_),
finisher(cct, "PurgeQueue", "PQ_Finisher"),
timer(cct, lock),
void PurgeQueue::_recover()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
// Journaler::is_readable() adjusts write_pos if partial entry is encountered
while (1) {
bool PurgeQueue::_consume()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
bool could_consume = false;
while(_can_consume()) {
const PurgeItem &item,
uint64_t expire_to)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
in_flight[expire_to] = item;
logger->set(l_pq_executing, in_flight.size());
void PurgeQueue::_execute_item_complete(
uint64_t expire_to)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
ceph_assert(in_flight.count(expire_to) == 1);
protected:
CephContext *cct;
const mds_rank_t rank;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("PurgeQueue");
bool readonly = false;
int64_t metadata_pool;
{
dout(10) << __func__ << " with {" << *in << "}"
<< ", on_finish=" << on_finish << ", top=" << top << dendl;
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
in->scrub_initialize(parent, header, on_finish);
if (top)
push_inode(in);
void ScrubStack::kick_off_scrubs()
{
- ceph_assert(mdcache->mds->mds_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock));
dout(20) << __func__ << ": state=" << state << dendl;
if (clear_inode_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
: MDSInternalContext(mdcache->mds), stack(s) { }
void ScrubStack::complete_control_contexts(int r) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
for (auto &ctx : control_ctxs) {
ctx->complete(r);
}
bool ScrubStack::scrub_in_transition_state() {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
dout(20) << __func__ << ": state=" << state << dendl;
// STATE_RUNNING is considered as a transition state so as to
}
void ScrubStack::scrub_status(Formatter *f) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
f->open_object_section("result");
}
void ScrubStack::abort_pending_scrubs() {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
ceph_assert(clear_inode_stack);
for (auto inode = inode_stack.begin(); !inode.end(); ++inode) {
}
void ScrubStack::scrub_abort(Context *on_finish) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
ceph_assert(on_finish != nullptr);
dout(10) << __func__ << ": aborting with " << scrubs_in_progress
}
void ScrubStack::scrub_pause(Context *on_finish) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
ceph_assert(on_finish != nullptr);
dout(10) << __func__ << ": pausing with " << scrubs_in_progress
}
bool ScrubStack::scrub_resume() {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
dout(20) << __func__ << ": state=" << state << dendl;
int r = 0;
if (reply) {
int64_t session_id = session->get_client().v;
send_reply = new FunctionContext([this, session_id, reply](int r) {
- assert(mds->mds_lock.is_locked_by_me());
+ assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(session_id));
if (!session) {
return;
*/
void Server::kill_session(Session *session, Context *on_safe)
{
- ceph_assert(mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
if ((session->is_opening() ||
session->is_open() ||
void DaemonStateIndex::insert(DaemonStatePtr dm)
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
_insert(dm);
}
DaemonStateCollection DaemonStateIndex::get_by_service(
const std::string& svc) const
{
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
DaemonStateCollection result;
DaemonStateCollection DaemonStateIndex::get_by_server(
const std::string &hostname) const
{
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
if (by_server.count(hostname)) {
return by_server.at(hostname);
bool DaemonStateIndex::exists(const DaemonKey &key) const
{
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
return all.count(key) > 0;
}
DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key)
{
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
auto iter = all.find(key);
if (iter != all.end()) {
void DaemonStateIndex::rm(const DaemonKey &key)
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
_rm(key);
}
{
std::vector<string> victims;
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
auto begin = all.lower_bound({svc_name, ""});
auto end = all.end();
for (auto &i = begin; i != end; ++i) {
class DaemonState
{
public:
- Mutex lock = {"DaemonState::lock"};
+ ceph::mutex lock = ceph::make_mutex("DaemonState::lock");
DaemonKey key;
class DaemonStateIndex
{
private:
- mutable RWLock lock = {"DaemonStateIndex", true, true, true};
+ mutable ceph::shared_mutex lock =
+ ceph::make_shared_mutex("DaemonStateIndex", true, true, true);
std::map<std::string, DaemonStateCollection> by_server;
DaemonStateCollection all;
template<typename Callback, typename...Args>
auto with_daemons_by_server(Callback&& cb, Args&&... args) const ->
decltype(cb(by_server, std::forward<Args>(args)...)) {
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
return std::forward<Callback>(cb)(by_server, std::forward<Args>(args)...);
}
template<typename Callback, typename...Args>
bool with_device(const std::string& dev,
Callback&& cb, Args&&... args) const {
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
auto p = devices.find(dev);
if (p == devices.end()) {
return false;
template<typename Callback, typename...Args>
bool with_device_write(const std::string& dev,
Callback&& cb, Args&&... args) {
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
auto p = devices.find(dev);
if (p == devices.end()) {
return false;
template<typename Callback, typename...Args>
void with_device_create(const std::string& dev,
Callback&& cb, Args&&... args) {
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
auto d = _get_or_create_device(dev);
std::forward<Callback>(cb)(*d, std::forward<Args>(args)...);
}
template<typename Callback, typename...Args>
void with_devices(Callback&& cb, Args&&... args) const {
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
for (auto& i : devices) {
std::forward<Callback>(cb)(*i.second, std::forward<Args>(args)...);
}
void with_devices2(CallbackInitial&& cbi, // with lock taken
Callback&& cb, // for each device
Args&&... args) const {
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
cbi();
for (auto& i : devices) {
std::forward<Callback>(cb)(*i.second, std::forward<Args>(args)...);
}
void notify_updating(const DaemonKey &k) {
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
updating.insert(k);
}
void clear_updating(const DaemonKey &k) {
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
updating.erase(k);
}
bool is_updating(const DaemonKey &k) {
- RWLock::RLocker l(lock);
+ std::shared_lock l{lock};
return updating.count(k) > 0;
}
void update_metadata(DaemonStatePtr state,
const map<string,string>& meta) {
// remove and re-insert in case the device metadata changed
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
_rm(state->key);
{
- Mutex::Locker l2(state->lock);
+ std::lock_guard l2{state->lock};
state->set_metadata(meta);
}
_insert(state);