From 35d0ce394f746158f2695efb4c09511eff82bd97 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sun, 7 Jul 2019 11:16:03 +0800 Subject: [PATCH] osd: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- src/osd/OSD.cc | 223 +++++++++++++++++---------------------- src/osd/OSD.h | 68 ++++++------ src/osd/OSDMapMapping.cc | 2 +- src/osd/OSDMapMapping.h | 26 ++--- src/osd/PG.cc | 47 ++++----- src/osd/PG.h | 25 +++-- src/osd/PrimaryLogPG.cc | 1 - src/osd/PrimaryLogPG.h | 5 +- src/osd/Session.h | 16 ++- src/osd/Watch.cc | 33 +++--- src/osd/Watch.h | 6 +- 11 files changed, 201 insertions(+), 251 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 6d4acc24a42..1724ca213b7 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -232,54 +232,40 @@ OSDService::OSDService(OSD *osd) : publish_lock{ceph::make_mutex("OSDService::publish_lock")}, pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")}, max_oldest_map(0), - sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0), + scrubs_pending(0), scrubs_active(0), - agent_lock("OSDService::agent_lock"), agent_valid_iterator(false), agent_ops(0), flush_mode_high_count(0), agent_active(true), agent_thread(this), agent_stop_flag(false), - agent_timer_lock("OSDService::agent_timer_lock"), agent_timer(osd->client_messenger->cct, agent_timer_lock), last_recalibrate(ceph_clock_now()), promote_max_objects(0), promote_max_bytes(0), objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, NULL, 0, 0)), m_objecter_finishers(cct->_conf->osd_objecter_finishers), - watch_lock("OSDService::watch_lock"), watch_timer(osd->client_messenger->cct, watch_lock), next_notif_id(0), - recovery_request_lock("OSDService::recovery_request_lock"), recovery_request_timer(cct, recovery_request_lock, false), - sleep_lock("OSDService::sleep_lock"), sleep_timer(cct, sleep_lock, false), reserver_finisher(cct), local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills, cct->_conf->osd_min_recovery_priority), remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills, cct->_conf->osd_min_recovery_priority), - pg_temp_lock("OSDService::pg_temp_lock"), snap_reserver(cct, &reserver_finisher, cct->_conf->osd_max_trimming_pgs), - recovery_lock("OSDService::recovery_lock"), recovery_ops_active(0), recovery_ops_reserved(0), recovery_paused(false), - map_cache_lock("OSDService::map_cache_lock"), map_cache(cct, cct->_conf->osd_map_cache_size), map_bl_cache(cct->_conf->osd_map_cache_size), map_bl_inc_cache(cct->_conf->osd_map_cache_size), - stat_lock("OSDService::stat_lock"), - full_status_lock("OSDService::full_status_lock"), cur_state(NONE), cur_ratio(0), physical_ratio(0), - epoch_lock("OSDService::epoch_lock"), boot_epoch(0), up_epoch(0), bind_epoch(0) -#ifdef PG_DEBUG_REFS - , pgid_lock("OSDService::pgid_lock") -#endif { objecter->init(); @@ -506,12 +492,11 @@ void OSDService::final_init() void OSDService::activate_map() { // wake/unwake the tiering agent - agent_lock.Lock(); + std::lock_guard l{agent_lock}; agent_active = !osdmap->test_flag(CEPH_OSDMAP_NOTIERAGENT) && osd->is_active(); - agent_cond.Signal(); - agent_lock.Unlock(); + agent_cond.notify_all(); } void OSDService::request_osdmap_update(epoch_t e) @@ -531,12 +516,12 @@ public: void OSDService::agent_entry() { dout(10) << __func__ << " start" << dendl; - agent_lock.Lock(); + std::unique_lock agent_locker{agent_lock}; while (!agent_stop_flag) { if (agent_queue.empty()) { dout(20) << __func__ << " empty queue" << dendl; - agent_cond.Wait(agent_lock); + agent_cond.wait(agent_locker); continue; } uint64_t level = agent_queue.rbegin()->first; @@ -555,7 +540,7 @@ void OSDService::agent_entry() if (!flush_mode_high_count) agent_flush_quota = cct->_conf->osd_agent_max_low_ops - agent_ops; if (agent_flush_quota <= 0 || top.empty() || !agent_active) { - agent_cond.Wait(agent_lock); + agent_cond.wait(agent_locker); continue; } @@ -567,7 +552,7 @@ void OSDService::agent_entry() dout(10) << "high_count " << flush_mode_high_count << " agent_ops " << agent_ops << " flush_quota " << agent_flush_quota << dendl; - agent_lock.Unlock(); + agent_locker.unlock(); if (!pg->agent_work(max, agent_flush_quota)) { dout(10) << __func__ << " " << pg->pg_id << " no agent_work, delay for " << cct->_conf->osd_agent_delay_time @@ -575,14 +560,12 @@ void OSDService::agent_entry() osd->logger->inc(l_osd_tier_delay); // Queue a timer to call agent_choose_mode for this pg in 5 seconds - agent_timer_lock.Lock(); + std::lock_guard timer_locker{agent_timer_lock}; Context *cb = new AgentTimeoutCB(pg); agent_timer.add_event_after(cct->_conf->osd_agent_delay_time, cb); - agent_timer_lock.Unlock(); } - agent_lock.Lock(); + agent_locker.lock(); } - agent_lock.Unlock(); dout(10) << __func__ << " finish" << dendl; } @@ -601,7 +584,7 @@ void OSDService::agent_stop() } agent_stop_flag = true; - agent_cond.Signal(); + agent_cond.notify_all(); } agent_thread.join(); } @@ -1189,8 +1172,7 @@ bool OSDService::can_inc_scrubs_pending() bool OSDService::inc_scrubs_pending() { bool result = false; - - sched_scrub_lock.Lock(); + std::lock_guard l{sched_scrub_lock}; if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) { dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1) << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; @@ -1199,24 +1181,21 @@ bool OSDService::inc_scrubs_pending() } else { dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl; } - sched_scrub_lock.Unlock(); - return result; } void OSDService::dec_scrubs_pending() { - sched_scrub_lock.Lock(); + std::lock_guard l{sched_scrub_lock}; dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1) << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; --scrubs_pending; ceph_assert(scrubs_pending >= 0); - sched_scrub_lock.Unlock(); } void OSDService::inc_scrubs_active(bool reserved) { - sched_scrub_lock.Lock(); + std::lock_guard l{sched_scrub_lock}; ++(scrubs_active); if (reserved) { --(scrubs_pending); @@ -1229,17 +1208,15 @@ void OSDService::inc_scrubs_active(bool reserved) << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl; } - sched_scrub_lock.Unlock(); } void OSDService::dec_scrubs_active() { - sched_scrub_lock.Lock(); + std::lock_guard l{sched_scrub_lock}; dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1) << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl; --scrubs_active; ceph_assert(scrubs_active >= 0); - sched_scrub_lock.Unlock(); } void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, @@ -1842,7 +1819,7 @@ void OSDService::_queue_for_recovery( std::pair p, uint64_t reserved_pushes) { - ceph_assert(recovery_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock)); enqueue_back( OpQueueItem( unique_ptr( @@ -2069,9 +2046,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, MonClient *mc, const std::string &dev, const std::string &jdev) : Dispatcher(cct_), - osd_lock("OSD::osd_lock"), tick_timer(cct, osd_lock), - tick_timer_lock("OSD::tick_timer_lock"), tick_timer_without_osd_lock(cct, tick_timer_lock), gss_ktfile_client(cct->_conf.get_val("gss_ktab_client_file")), cluster_messenger(internal_messenger), @@ -2095,9 +2070,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp", get_num_op_threads()), command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1), - session_waiting_lock("OSD::session_waiting_lock"), - osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"), - heartbeat_lock("OSD::heartbeat_lock"), heartbeat_stop(false), heartbeat_need_update(true), hb_front_client_messenger(hb_client_front), @@ -2117,9 +2089,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, cct->_conf->osd_op_thread_timeout, cct->_conf->osd_op_thread_suicide_timeout, &osd_op_tp), - map_lock("OSD::map_lock"), last_pg_create_epoch(0), - mon_report_lock("OSD::mon_report_lock"), boot_finisher(cct), up_thru_wanted(0), requested_full_first(0), @@ -3051,7 +3021,7 @@ int OSD::init() new C_Tick_WithoutOSDLock(this)); } - osd_lock.Unlock(); + osd_lock.unlock(); r = monc->authenticate(); if (r < 0) { @@ -3083,7 +3053,7 @@ int OSD::init() exit(1); } - osd_lock.Lock(); + osd_lock.lock(); if (is_stopping()) return 0; @@ -3391,9 +3361,9 @@ int OSD::shutdown() { if (!service.prepare_to_stop()) return 0; // already shutting down - osd_lock.Lock(); + osd_lock.lock(); if (is_stopping()) { - osd_lock.Unlock(); + osd_lock.unlock(); return 0; } dout(0) << "shutdown" << dendl; @@ -3444,12 +3414,13 @@ int OSD::shutdown() delete test_ops_hook; test_ops_hook = NULL; - osd_lock.Unlock(); + osd_lock.unlock(); - heartbeat_lock.Lock(); - heartbeat_stop = true; - heartbeat_cond.Signal(); - heartbeat_lock.Unlock(); + { + std::lock_guard l{heartbeat_lock}; + heartbeat_stop = true; + heartbeat_cond.notify_all(); + } heartbeat_thread.join(); osd_op_tp.drain(); @@ -3465,7 +3436,7 @@ int OSD::shutdown() boot_finisher.wait_for_empty(); - osd_lock.Lock(); + osd_lock.lock(); boot_finisher.stop(); reset_heartbeat_peers(true); @@ -3526,9 +3497,9 @@ int OSD::shutdown() service.dump_live_pgids(); #endif - osd_lock.Unlock(); + osd_lock.unlock(); cct->_conf.remove_observer(this); - osd_lock.Lock(); + osd_lock.lock(); service.meta_ch.reset(); @@ -3541,12 +3512,11 @@ int OSD::shutdown() } monc->shutdown(); - osd_lock.Unlock(); - - map_lock.get_write(); - osdmap = OSDMapRef(); - map_lock.put_write(); - + osd_lock.unlock(); + { + std::unique_lock l{map_lock}; + osdmap = OSDMapRef(); + } for (auto s : shards) { std::lock_guard l(s->osdmap_lock); s->shard_osdmap = OSDMapRef(); @@ -3970,7 +3940,7 @@ PGRef OSD::lookup_lock_pg(spg_t pgid) void OSD::load_pgs() { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); dout(0) << "load_pgs" << dendl; { @@ -4159,7 +4129,7 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap, pg->init_collection_pool_opts(); if (pg->is_primary()) { - Mutex::Locker locker(m_perf_queries_lock); + std::lock_guard locker{m_perf_queries_lock}; pg->set_dynamic_perf_stats_queries(m_perf_queries); } @@ -4416,7 +4386,7 @@ void OSD::need_heartbeat_peer_update() void OSD::maybe_update_heartbeat_peers() { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); if (is_waiting_for_healthy() || is_active()) { utime_t now = ceph_clock_now(); @@ -4523,7 +4493,7 @@ void OSD::maybe_update_heartbeat_peers() void OSD::reset_heartbeat_peers(bool all) { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); dout(10) << "reset_heartbeat_peers" << dendl; utime_t stale = ceph_clock_now(); stale -= cct->_conf.get_val("osd_heartbeat_stale"); @@ -4555,16 +4525,16 @@ void OSD::handle_osd_ping(MOSDPing *m) int from = m->get_source().num(); - heartbeat_lock.Lock(); + heartbeat_lock.lock(); if (is_stopping()) { - heartbeat_lock.Unlock(); + heartbeat_lock.unlock(); m->put(); return; } OSDMapRef curmap = service.get_osdmap(); if (!curmap) { - heartbeat_lock.Unlock(); + heartbeat_lock.unlock(); m->put(); return; } @@ -4720,23 +4690,22 @@ void OSD::handle_osd_ping(MOSDPing *m) break; } - heartbeat_lock.Unlock(); + heartbeat_lock.unlock(); m->put(); } void OSD::heartbeat_entry() { - std::lock_guard l(heartbeat_lock); + std::unique_lock l(heartbeat_lock); if (is_stopping()) return; while (!heartbeat_stop) { heartbeat(); double wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval; - utime_t w; - w.set_from_double(wait); + auto w = ceph::make_timespan(wait); dout(30) << "heartbeat_entry sleeping for " << wait << dendl; - heartbeat_cond.WaitInterval(heartbeat_lock, w); + heartbeat_cond.wait_for(l, w); if (is_stopping()) return; dout(30) << "heartbeat_entry woke up" << dendl; @@ -4745,7 +4714,7 @@ void OSD::heartbeat_entry() void OSD::heartbeat_check() { - ceph_assert(heartbeat_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(heartbeat_lock)); utime_t now = ceph_clock_now(); // check for incoming heartbeats (move me elsewhere?) @@ -4794,7 +4763,7 @@ void OSD::heartbeat_check() void OSD::heartbeat() { - ceph_assert(heartbeat_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock)); dout(30) << "heartbeat" << dendl; // get CPU load avg @@ -4924,7 +4893,7 @@ bool OSD::heartbeat_reset(Connection *con) void OSD::tick() { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); dout(10) << "tick" << dendl; if (is_active() || is_waiting_for_healthy()) { @@ -4975,7 +4944,7 @@ void OSD::tick() void OSD::tick_without_osd_lock() { - ceph_assert(tick_timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(tick_timer_lock)); dout(10) << "tick_without_osd_lock" << dendl; logger->set(l_osd_cached_crc, buffer::get_cached_crc()); @@ -4992,11 +4961,11 @@ void OSD::tick_without_osd_lock() // osd_lock is not being held, which means the OSD state // might change when doing the monitor report if (is_active() || is_waiting_for_healthy()) { - heartbeat_lock.Lock(); - heartbeat_check(); - heartbeat_lock.Unlock(); - - map_lock.get_read(); + { + std::lock_guard l{heartbeat_lock}; + heartbeat_check(); + } + map_lock.lock_shared(); std::lock_guard l(mon_report_lock); // mon report? @@ -5007,7 +4976,7 @@ void OSD::tick_without_osd_lock() send_full_update(); send_failures(); } - map_lock.put_read(); + map_lock.unlock_shared(); epoch_t max_waiting_epoch = 0; for (auto s : shards) { @@ -5303,7 +5272,7 @@ void OSD::ms_handle_connect(Connection *con) } else if (is_booting()) { _send_boot(); // resend boot message } else { - map_lock.get_read(); + map_lock.lock_shared(); std::lock_guard l2(mon_report_lock); utime_t now = ceph_clock_now(); @@ -5320,7 +5289,7 @@ void OSD::ms_handle_connect(Connection *con) requeue_failures(); send_failures(); - map_lock.put_read(); + map_lock.unlock_shared(); if (is_active()) { send_beacon(ceph::coarse_mono_clock::now()); } @@ -5506,15 +5475,15 @@ void OSD::_preboot(epoch_t oldest, epoch_t newest) boot_finisher.queue( new FunctionContext( [this](int r) { - std::lock_guard l(osd_lock); + std::unique_lock l(osd_lock); if (is_preboot()) { dout(10) << __func__ << " waiting for peering work to drain" << dendl; - osd_lock.Unlock(); + l.unlock(); for (auto shard : shards) { shard->wait_min_pg_epoch(osdmap->get_epoch()); } - osd_lock.Lock(); + l.lock(); } if (is_preboot()) { _send_boot(); @@ -5788,9 +5757,9 @@ void OSD::_collect_metadata(map *pm) void OSD::queue_want_up_thru(epoch_t want) { - map_lock.get_read(); + std::shared_lock map_locker{map_lock}; epoch_t cur = osdmap->get_up_thru(whoami); - std::lock_guard l(mon_report_lock); + std::lock_guard report_locker(mon_report_lock); if (want > up_thru_wanted) { dout(10) << "queue_want_up_thru now " << want << " (was " << up_thru_wanted << ")" << ", currently " << cur @@ -5802,12 +5771,11 @@ void OSD::queue_want_up_thru(epoch_t want) << ", currently " << cur << dendl; } - map_lock.put_read(); } void OSD::send_alive() { - ceph_assert(mon_report_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(mon_report_lock)); if (!osdmap->exists(whoami)) return; epoch_t up_thru = osdmap->get_up_thru(whoami); @@ -5823,7 +5791,7 @@ void OSD::request_full_map(epoch_t first, epoch_t last) dout(10) << __func__ << " " << first << ".." << last << ", previously requested " << requested_full_first << ".." << requested_full_last << dendl; - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); ceph_assert(first > 0 && last > 0); ceph_assert(first <= last); ceph_assert(first >= requested_full_first); // we shouldn't ever ask for older maps @@ -5847,7 +5815,7 @@ void OSD::request_full_map(epoch_t first, epoch_t last) void OSD::got_full_map(epoch_t e) { ceph_assert(requested_full_first <= requested_full_last); - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); if (requested_full_first == 0) { dout(20) << __func__ << " " << e << ", nothing requested" << dendl; return; @@ -5887,8 +5855,8 @@ void OSD::requeue_failures() void OSD::send_failures() { - ceph_assert(map_lock.is_locked()); - ceph_assert(mon_report_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(map_lock)); + ceph_assert(ceph_mutex_is_locked(mon_report_lock)); std::lock_guard l(heartbeat_lock); utime_t now = ceph_clock_now(); while (!failure_queue.empty()) { @@ -6155,9 +6123,9 @@ void OSD::do_command( namespace { class unlock_guard { - Mutex& m; + ceph::mutex& m; public: - explicit unlock_guard(Mutex& mutex) + explicit unlock_guard(ceph::mutex& mutex) : m(mutex) { m.unlock(); @@ -6692,7 +6660,7 @@ void OSD::scrub_purged_snaps() pg->queue_snap_retrim(snap); pg->unlock(); } - osd_lock.Lock(); + osd_lock.lock(); if (is_stopping()) { return; } @@ -6779,9 +6747,9 @@ bool OSD::ms_dispatch(Message *m) // lock! - osd_lock.Lock(); + osd_lock.lock(); if (is_stopping()) { - osd_lock.Unlock(); + osd_lock.unlock(); m->put(); return true; } @@ -6789,7 +6757,7 @@ bool OSD::ms_dispatch(Message *m) do_waiters(); _dispatch(m); - osd_lock.Unlock(); + osd_lock.unlock(); return true; } @@ -6840,7 +6808,7 @@ void OSDService::maybe_share_map( void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap) { - ceph_assert(session->session_dispatch_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(session->session_dispatch_lock)); auto i = session->waiting_on_map.begin(); while (i != session->waiting_on_map.end()) { @@ -7020,7 +6988,7 @@ int OSD::ms_handle_authentication(Connection *con) void OSD::do_waiters() { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); dout(10) << "do_waiters -- start" << dendl; while (!finished.empty()) { @@ -7043,7 +7011,7 @@ void OSD::dispatch_op(OpRequestRef op) void OSD::_dispatch(Message *m) { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); dout(20) << "_dispatch " << m << " " << *m << dendl; switch (m->get_type()) { @@ -7391,7 +7359,7 @@ MPGStats* OSD::collect_pg_stats() // stats every time we're called. This has equivalent cost to the // previous implementation's worst case where all PGs are busy and // their stats are always enqueued for sending. - RWLock::RLocker l(map_lock); + std::shared_lock l{map_lock}; osd_stat_t cur_stat = service.get_osd_stat(); cur_stat.os_perf_stat = store->get_cur_stats(); @@ -7508,10 +7476,10 @@ void OSD::wait_for_new_map(OpRequestRef op) void OSD::note_down_osd(int peer) { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); cluster_messenger->mark_down_addrs(osdmap->get_cluster_addrs(peer)); - heartbeat_lock.Lock(); + std::lock_guard l{heartbeat_lock}; failure_queue.erase(peer); failure_pending.erase(peer); map::iterator p = heartbeat_peers.find(peer); @@ -7522,7 +7490,6 @@ void OSD::note_down_osd(int peer) } heartbeat_peers.erase(p); } - heartbeat_lock.Unlock(); } void OSD::note_up_osd(int peer) @@ -7635,7 +7602,7 @@ void OSD::handle_osd_map(MOSDMap *m) } } - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); map added_maps; map added_maps_bl; if (m->fsid != monc->get_fsid()) { @@ -7923,7 +7890,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m) dout(10) << __func__ << " bailing, we are shutting down" << dendl; return; } - map_lock.get_write(); + map_lock.lock(); bool do_shutdown = false; bool do_restart = false; @@ -8128,7 +8095,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m) } } - map_lock.put_write(); + map_lock.unlock(); check_osdmap_features(); @@ -8480,7 +8447,7 @@ bool OSD::advance_pg( void OSD::consume_map() { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); dout(7) << "consume_map version " << osdmap->get_epoch() << dendl; /** make sure the cluster is speaking in SORTBITWISE, because we don't @@ -8597,7 +8564,7 @@ void OSD::consume_map() void OSD::activate_map() { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); dout(7) << "activate_map version " << osdmap->get_epoch() << dendl; @@ -8693,12 +8660,12 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map, auto priv = con->get_priv(); if (auto s = static_cast(priv.get()); s) { if (!is_fast_dispatch) - s->session_dispatch_lock.Lock(); + s->session_dispatch_lock.lock(); clear_session_waiting_on_map(s); con->set_priv(nullptr); // break ref <-> session cycle, if any s->con.reset(); if (!is_fast_dispatch) - s->session_dispatch_lock.Unlock(); + s->session_dispatch_lock.unlock(); } return false; } @@ -8717,7 +8684,7 @@ bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch, dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl; - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); // do they have a newer map? if (epoch > osdmap->get_epoch()) { @@ -9298,7 +9265,7 @@ void OSD::handle_pg_query_nopg(const MQuery& q) // RECOVERY void OSDService::_maybe_queue_recovery() { - ceph_assert(recovery_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock)); uint64_t available_pushes; while (!awaiting_throttle.empty() && _recover_now(&available_pushes)) { @@ -9373,12 +9340,14 @@ void OSD::do_recovery( // This is true for the first recovery op and when the previous recovery op // has been scheduled in the past. The next recovery op is scheduled after // completing the sleep from now. - if (service.recovery_schedule_time < ceph_clock_now()) { - service.recovery_schedule_time = ceph_clock_now(); + + if (auto now = ceph::real_clock::now(); + service.recovery_schedule_time < now) { + service.recovery_schedule_time = now; } - service.recovery_schedule_time += recovery_sleep; + service.recovery_schedule_time += ceph::make_timespan(recovery_sleep); service.sleep_timer.add_event_at(service.recovery_schedule_time, - recovery_requeue_callback); + recovery_requeue_callback); dout(20) << "Recovery event scheduled at " << service.recovery_schedule_time << dendl; return; @@ -9679,7 +9648,7 @@ const char** OSD::get_tracked_conf_keys() const void OSD::handle_conf_change(const ConfigProxy& conf, const std::set &changed) { - Mutex::Locker l(osd_lock); + std::lock_guard l{osd_lock}; if (changed.count("osd_max_backfills")) { service.local_reserver.set_max(cct->_conf->osd_max_backfills); service.remote_reserver.set_max(cct->_conf->osd_max_backfills); @@ -10023,13 +9992,11 @@ void OSD::set_perf_queries( dout(1) << queries.size() - supported_queries.size() << " unsupported queries" << dendl; } - { - Mutex::Locker locker(m_perf_queries_lock); + std::lock_guard locker{m_perf_queries_lock}; m_perf_queries = supported_queries; m_perf_limits = queries; } - std::vector pgs; _get_pgs(&pgs); for (auto& pg : pgs) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 95e772bd8e5..67ec62159d5 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -19,8 +19,6 @@ #include "msg/Dispatcher.h" -#include "common/Mutex.h" -#include "common/RWLock.h" #include "common/Timer.h" #include "common/WorkQueue.h" #include "common/AsyncReserver.h" @@ -273,7 +271,7 @@ public: private: // -- scrub scheduling -- - Mutex sched_scrub_lock; + ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock"); int scrubs_pending; int scrubs_active; @@ -363,8 +361,8 @@ public: private: // -- agent shared state -- - Mutex agent_lock; - Cond agent_cond; + ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock"); + ceph::condition_variable agent_cond; map > agent_queue; set::iterator agent_queue_pos; bool agent_valid_iterator; @@ -381,7 +379,7 @@ private: } } agent_thread; bool agent_stop_flag; - Mutex agent_timer_lock; + ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock"); SafeTimer agent_timer; public: @@ -394,7 +392,7 @@ public: agent_valid_iterator = false; // inserting higher-priority queue set& nq = agent_queue[priority]; if (nq.empty()) - agent_cond.Signal(); + agent_cond.notify_all(); nq.insert(pg); } @@ -443,7 +441,7 @@ public: std::lock_guard l(agent_lock); ceph_assert(agent_ops > 0); --agent_ops; - agent_cond.Signal(); + agent_cond.notify_all(); } /// note start of an async (flush) op @@ -461,7 +459,7 @@ public: --agent_ops; ceph_assert(agent_oids.count(oid) == 1); agent_oids.erase(oid); - agent_cond.Signal(); + agent_cond.notify_all(); } /// check if we are operating on an object @@ -518,7 +516,7 @@ public: vector objecter_finishers; // -- Watch -- - Mutex watch_lock; + ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock"); SafeTimer watch_timer; uint64_t next_notif_id; uint64_t get_next_id(epoch_t cur_epoch) { @@ -527,15 +525,15 @@ public: } // -- Recovery/Backfill Request Scheduling -- - Mutex recovery_request_lock; + ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock"); SafeTimer recovery_request_timer; // For async recovery sleep bool recovery_needs_sleep = true; - utime_t recovery_schedule_time = utime_t(); + ceph::real_clock::time_point recovery_schedule_time; // For recovery & scrub & snap - Mutex sleep_lock; + ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock"); SafeTimer sleep_timer; // -- tids -- @@ -551,7 +549,7 @@ public: AsyncReserver remote_reserver; // -- pg merge -- - Mutex merge_lock = {"OSD::merge_lock"}; + ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock"); map ready_to_merge_source; // pg -> version map> ready_to_merge_target; // pg -> (version,les,lec) set not_ready_to_merge_source; @@ -574,7 +572,7 @@ public: // -- pg_temp -- private: - Mutex pg_temp_lock; + ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock"); struct pg_temp_t { vector acting; bool forced = false; @@ -605,7 +603,7 @@ public: private: // -- pg recovery and associated throttling -- - Mutex recovery_lock; + ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock"); list > awaiting_throttle; utime_t defer_recovery_until; @@ -669,7 +667,7 @@ public: } // osd map cache (past osd maps) - Mutex map_cache_lock; + ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock"); SharedLRU map_cache; SimpleLRU map_bl_cache; SimpleLRU map_bl_inc_cache; @@ -741,7 +739,7 @@ public: void shutdown(); // -- stats -- - Mutex stat_lock; + ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock"); osd_stat_t osd_stat; uint32_t seq = 0; @@ -765,7 +763,7 @@ public: // -- OSD Full Status -- private: friend TestOpsSocketHook; - mutable Mutex full_status_lock; + mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock"); enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending const char *get_full_state_name(s_names s) const { switch (s) { @@ -816,7 +814,8 @@ public: // -- epochs -- private: - mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch + // protects access to boot_epoch, up_epoch, bind_epoch + mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock"); epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started) epoch_t up_epoch; // _most_recent_ epoch we were marked up epoch_t bind_epoch; // epoch we last did a bind to new ip:ports @@ -875,7 +874,7 @@ public: #ifdef PG_DEBUG_REFS - Mutex pgid_lock; + ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock"); map pgid_tracker; map live_pgs; void add_pgid(spg_t pgid, PG *pg); @@ -1100,11 +1099,12 @@ struct OSDShard { class OSD : public Dispatcher, public md_config_obs_t { /** OSD **/ - Mutex osd_lock; // global lock + // global lock + ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock"); SafeTimer tick_timer; // safe timer (osd_lock) // Tick timer for those stuff that do not need osd_lock - Mutex tick_timer_lock; + ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock"); SafeTimer tick_timer_without_osd_lock; std::string gss_ktfile_client{}; @@ -1329,7 +1329,7 @@ private: private: void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap); - Mutex session_waiting_lock; + ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock"); set session_waiting_for_map; /// Caller assumes refs for included Sessions @@ -1388,7 +1388,7 @@ private: void osdmap_subscribe(version_t epoch, bool force_request); /** @} monc helpers */ - Mutex osdmap_subscribe_lock; + ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock"); epoch_t latest_subscribed_epoch{0}; // -- heartbeat -- @@ -1441,9 +1441,9 @@ private: int peer; explicit HeartbeatSession(int p) : peer(p) {} }; - Mutex heartbeat_lock; + ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock"); map debug_heartbeat_drops_remaining; - Cond heartbeat_cond; + ceph::condition_variable heartbeat_cond; bool heartbeat_stop; std::atomic heartbeat_need_update; map heartbeat_peers; ///< map of osd id to HeartbeatInfo @@ -1476,7 +1476,7 @@ private: void heartbeat_kick() { std::lock_guard l(heartbeat_lock); - heartbeat_cond.Signal(); + heartbeat_cond.notify_all(); } struct T_Heartbeat : public Thread { @@ -1528,7 +1528,7 @@ private: list finished; void take_waiters(list& ls) { - ceph_assert(osd_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(osd_lock)); finished.splice(finished.end(), ls); } void do_waiters(); @@ -1697,7 +1697,7 @@ protected: pool_pg_num_history_t pg_num_history; - RWLock map_lock; + ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock"); list waiting_for_osdmap; deque osd_markdown_log; @@ -1752,7 +1752,7 @@ public: } protected: - Mutex merge_lock = {"OSD::merge_lock"}; + ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock"); /// merge epoch -> target pgid -> source pgid -> pg map>> merge_waiters; @@ -1812,7 +1812,7 @@ protected: void _finish_splits(set& pgs); // == monitor interaction == - Mutex mon_report_lock; + ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock"); utime_t last_mon_report; Finisher boot_finisher; @@ -1861,7 +1861,7 @@ protected: void cancel_pending_failures(); ceph::coarse_mono_clock::time_point last_sent_beacon; - Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"}; + ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock"); epoch_t min_last_epoch_clean = 0; // which pgs were scanned for min_lec std::vector min_last_epoch_clean_pgs; @@ -2178,7 +2178,7 @@ private: void get_perf_reports( std::map *reports); - Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"}; + ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock"); std::list m_perf_queries; std::map m_perf_limits; }; diff --git a/src/osd/OSDMapMapping.cc b/src/osd/OSDMapMapping.cc index 42c8d997d31..ba59c21dfbe 100644 --- a/src/osd/OSDMapMapping.cc +++ b/src/osd/OSDMapMapping.cc @@ -135,7 +135,7 @@ void ParallelPGMapper::Job::finish_one() finish = ceph_clock_now(); complete(); } - cond.Signal(); + cond.notify_all(); fin = onfinish; onfinish = nullptr; } diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h index 5435ef7b7ac..37ec74f6f50 100644 --- a/src/osd/OSDMapMapping.h +++ b/src/osd/OSDMapMapping.h @@ -24,8 +24,8 @@ public: bool aborted = false; Context *onfinish = nullptr; - Mutex lock = {"ParallelPGMapper::Job::lock"}; - Cond cond; + ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock"); + ceph::condition_variable cond; Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {} virtual ~Job() { @@ -38,15 +38,15 @@ public: virtual void complete() = 0; void set_finish_event(Context *fin) { - lock.Lock(); + lock.lock(); if (shards == 0) { // already done. - lock.Unlock(); + lock.unlock(); fin->complete(0); } else { // set finisher onfinish = fin; - lock.Unlock(); + lock.unlock(); } } bool is_done() { @@ -57,33 +57,29 @@ public: return finish - start; } void wait() { - std::lock_guard l(lock); - while (shards > 0) { - cond.Wait(lock); - } + std::unique_lock l(lock); + cond.wait(l, [this] { return shards == 0; }); } bool wait_for(double duration) { utime_t until = start; until += duration; - std::lock_guard l(lock); + std::unique_lock l(lock); while (shards > 0) { if (ceph_clock_now() >= until) { return false; } - cond.Wait(lock); + cond.wait(l); } return true; } void abort() { Context *fin = nullptr; { - std::lock_guard l(lock); + std::unique_lock l(lock); aborted = true; fin = onfinish; onfinish = nullptr; - while (shards > 0) { - cond.Wait(lock); - } + cond.wait(l, [this] { return shards == 0; }); } if (fin) { fin->complete(-ECANCELED); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index ac8ec080e39..64c99990a7a 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -191,12 +191,9 @@ PG::PG(OSDService *o, OSDMapRef curmap, scrub_queued(false), recovery_queued(false), recovery_ops_active(0), - heartbeat_peer_lock("PG::heartbeat_peer_lock"), backfill_reserving(false), - pg_stats_publish_lock("PG::pg_stats_publish_lock"), pg_stats_publish_valid(false), finish_sync_event(NULL), - backoff_lock("PG::backoff_lock"), scrub_after_recovery(false), active_pushes(0), recovery_state( @@ -229,7 +226,7 @@ PG::~PG() void PG::lock(bool no_lockdep) const { - _lock.Lock(no_lockdep); + _lock.lock(no_lockdep); // if we have unrecorded dirty state with the lock dropped, there is a bug ceph_assert(!recovery_state.debug_has_dirty_state()); @@ -707,7 +704,7 @@ void PG::rm_backoff(BackoffRef b) { dout(10) << __func__ << " " << *b << dendl; std::lock_guard l(backoff_lock); - ceph_assert(b->lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(b->lock)); ceph_assert(b->pg == this); auto p = backoffs.find(b->begin); // may race with release_backoffs() @@ -783,7 +780,7 @@ void PG::clear_probe_targets() void PG::update_heartbeat_peers(set new_peers) { bool need_update = false; - heartbeat_peer_lock.Lock(); + heartbeat_peer_lock.lock(); if (new_peers == heartbeat_peers) { dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl; } else { @@ -791,7 +788,7 @@ void PG::update_heartbeat_peers(set new_peers) heartbeat_peers.swap(new_peers); need_update = true; } - heartbeat_peer_lock.Unlock(); + heartbeat_peer_lock.unlock(); if (need_update) osd->need_heartbeat_peer_update(); @@ -815,8 +812,7 @@ void PG::publish_stats_to_osd() if (!is_primary()) return; - pg_stats_publish_lock.Lock(); - + std::lock_guard l{pg_stats_publish_lock}; auto stats = recovery_state.prepare_stats_for_publish( pg_stats_publish_valid, pg_stats_publish, @@ -825,16 +821,13 @@ void PG::publish_stats_to_osd() pg_stats_publish = stats.value(); pg_stats_publish_valid = true; } - - pg_stats_publish_lock.Unlock(); } void PG::clear_publish_stats() { dout(15) << "clear_stats" << dendl; - pg_stats_publish_lock.Lock(); + std::lock_guard l{pg_stats_publish_lock}; pg_stats_publish_valid = false; - pg_stats_publish_lock.Unlock(); } /** @@ -1991,7 +1984,7 @@ bool PG::try_reserve_recovery_space( // This lock protects not only the stats OSDService but also setting the // pg primary_bytes. That's why we don't immediately unlock - Mutex::Locker l(osd->stat_lock); + std::lock_guard l{osd->stat_lock}; osd_stat_t cur_stat = osd->osd_stat; if (cct->_conf->osd_debug_reject_backfill_probability > 0 && (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) { @@ -2180,21 +2173,19 @@ void PG::_scan_snaps(ScrubMap &smap) // wait for repair to apply to avoid confusing other bits of the system. { - Cond my_cond; - Mutex my_lock("PG::_scan_snaps my_lock"); + ceph::condition_variable my_cond; + ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock"); int r = 0; bool done; t.register_on_applied_sync( - new C_SafeCond(&my_lock, &my_cond, &done, &r)); + new C_SafeCond(my_lock, my_cond, &done, &r)); r = osd->store->queue_transaction(ch, std::move(t)); if (r != 0) { derr << __func__ << ": queue_transaction got " << cpp_strerror(r) << dendl; } else { - my_lock.Lock(); - while (!done) - my_cond.Wait(my_lock); - my_lock.Unlock(); + std::unique_lock l{my_lock}; + my_cond.wait(l, [&done] { return done;}); } } } @@ -3732,11 +3723,11 @@ void PG::do_delete_work(ObjectStore::Transaction &t) unlock(); }); - utime_t delete_schedule_time = ceph_clock_now(); - delete_schedule_time += osd_delete_sleep; - Mutex::Locker l(osd->sleep_lock); + auto delete_schedule_time = ceph::real_clock::now(); + delete_schedule_time += ceph::make_timespan(osd_delete_sleep); + std::lock_guard l{osd->sleep_lock}; osd->sleep_timer.add_event_at(delete_schedule_time, - delete_requeue_callback); + delete_requeue_callback); dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl; return; } @@ -3883,23 +3874,21 @@ void PG::dump_missing(Formatter *f) void PG::get_pg_stats(std::function f) { - pg_stats_publish_lock.Lock(); + std::lock_guard l{pg_stats_publish_lock}; if (pg_stats_publish_valid) { f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean()); } - pg_stats_publish_lock.Unlock(); } void PG::with_heartbeat_peers(std::function f) { - heartbeat_peer_lock.Lock(); + std::lock_guard l{heartbeat_peer_lock}; for (auto p : heartbeat_peers) { f(p); } for (auto p : probe_targets) { f(p); } - heartbeat_peer_lock.Unlock(); } uint64_t PG::get_min_alloc_size() const { diff --git a/src/osd/PG.h b/src/osd/PG.h index e393b7b20ed..beffac1a8a1 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -98,10 +98,10 @@ class PGRecoveryStats { per_state_info() : enter(0), exit(0), events(0) {} }; map info; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("PGRecoverStats::lock"); public: - PGRecoveryStats() : lock("PGRecoverStats::lock") {} + PGRecoveryStats() = default; void reset() { std::lock_guard l(lock); @@ -220,7 +220,7 @@ public: void unlock() const { //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl; ceph_assert(!recovery_state.debug_has_dirty_state()); - _lock.Unlock(); + _lock.unlock(); } bool is_locked() const { return _lock.is_locked(); @@ -607,12 +607,12 @@ protected: // get() should be called on pointer copy (to another thread, etc.). // put() should be called on destruction of some previously copied pointer. // unlock() when done with the current pointer (_most common_). - mutable Mutex _lock = {"PG::_lock"}; + mutable ceph::mutex _lock = ceph::make_mutex("PG::_lock"); std::atomic ref{0}; #ifdef PG_DEBUG_REFS - Mutex _ref_id_lock = {"PG::_ref_id_lock"}; + ceph::mutex _ref_id_lock = ceph::make_mutex("PG::_ref_id_lock"); map _live_ids; map _tag_counts; uint64_t _ref_id = 0; @@ -686,7 +686,8 @@ protected: void set_probe_targets(const set &probe_set) override; void clear_probe_targets() override; - Mutex heartbeat_peer_lock; + ceph::mutex heartbeat_peer_lock = + ceph::make_mutex("PG::heartbeat_peer_lock"); set heartbeat_peers; set probe_targets; @@ -836,7 +837,7 @@ public: // The value of num_bytes could be negative, // but we don't let info.stats.stats.sum.num_bytes go negative. void add_num_bytes(int64_t num_bytes) { - ceph_assert(_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(_lock)); if (num_bytes) { recovery_state.update_stats( [num_bytes](auto &history, auto &stats) { @@ -849,7 +850,7 @@ public: } } void sub_num_bytes(int64_t num_bytes) { - ceph_assert(_lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(_lock)); ceph_assert(num_bytes >= 0); if (num_bytes) { recovery_state.update_stats( @@ -865,7 +866,7 @@ public: // Only used in testing so not worried about needing the PG lock here int64_t get_stats_num_bytes() { - Mutex::Locker l(_lock); + std::lock_guard l{_lock}; int num_bytes = info.stats.stats.sum.num_bytes; if (pool.info.is_erasure()) { num_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count(); @@ -976,7 +977,8 @@ protected: object_stat_collection_t unstable_stats; // publish stats - Mutex pg_stats_publish_lock; + ceph::mutex pg_stats_publish_lock = + ceph::make_mutex("PG::pg_stats_publish_lock"); bool pg_stats_publish_valid; pg_stat_t pg_stats_publish; @@ -1060,7 +1062,8 @@ protected: friend class C_DeleteMore; // -- backoff -- - Mutex backoff_lock; // orders inside Backoff::lock + ceph::mutex backoff_lock = // orders inside Backoff::lock + ceph::make_mutex("PG::backoff_lock"); map> backoffs; void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end); diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index d85ce1d3a63..0d66d3c43dd 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1525,7 +1525,6 @@ PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap, PGBackend::build_pg_backend( _pool.info, ec_profile, this, coll_t(p), ch, o->store, cct)), object_contexts(o->cct, o->cct->_conf->osd_pg_object_context_cache_count), - snapset_contexts_lock("PrimaryLogPG::snapset_contexts_lock"), new_backfill(false), temp_seq(0), snap_trimmer_machine(this) diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index c50ebc722a5..3498a419f13 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -1007,7 +1007,8 @@ protected: SharedLRU object_contexts; // map from oid.snapdir() to SnapSetContext * map snapset_contexts; - Mutex snapset_contexts_lock; + ceph::mutex snapset_contexts_lock = + ceph::make_mutex("PrimaryLogPG::snapset_contexts_lock"); // debug order that client ops are applied map> debug_op_order; @@ -1053,7 +1054,7 @@ protected: _register_snapset_context(ssc); } void _register_snapset_context(SnapSetContext *ssc) { - ceph_assert(snapset_contexts_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(snapset_contexts_lock)); if (!ssc->registered) { ceph_assert(snapset_contexts.count(ssc->oid) == 0); ssc->registered = true; diff --git a/src/osd/Session.h b/src/osd/Session.h index e391200d746..ec01e0018e2 100644 --- a/src/osd/Session.h +++ b/src/osd/Session.h @@ -16,7 +16,7 @@ #define CEPH_OSD_SESSION_H #include "common/RefCountedObj.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "global/global_context.h" #include "include/spinlock.h" #include "OSDCap.h" @@ -96,7 +96,7 @@ struct Backoff : public RefCountedObject { } } - Mutex lock; + ceph::mutex lock = ceph::make_mutex("Backoff::lock"); // NOTE: the owning PG and session are either // - *both* set, or // - both null (teardown), or @@ -111,7 +111,6 @@ struct Backoff : public RefCountedObject { : RefCountedObject(g_ceph_context, 0), pgid(pgid), id(i), - lock("Backoff::lock"), pg(pg), session(s), begin(b), @@ -135,14 +134,15 @@ struct Session : public RefCountedObject { entity_addr_t socket_addr; WatchConState wstate; - Mutex session_dispatch_lock; + ceph::mutex session_dispatch_lock = + ceph::make_mutex("Session::session_dispatch_lock"); boost::intrusive::list waiting_on_map; ceph::spinlock sent_epoch_lock; epoch_t last_sent_epoch; /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock - Mutex backoff_lock; + ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock"); std::atomic backoff_count= {0}; ///< simple count of backoffs map>> backoffs; @@ -153,9 +153,7 @@ struct Session : public RefCountedObject { con(con_), socket_addr(con_->get_peer_socket_addr()), wstate(cct), - session_dispatch_lock("Session::session_dispatch_lock"), - last_sent_epoch(0), - backoff_lock("Session::backoff_lock") + last_sent_epoch(0) {} entity_addr_t& get_peer_socket_addr() { @@ -210,7 +208,7 @@ struct Session : public RefCountedObject { // called by PG::release_*_backoffs and PG::clear_backoffs() void rm_backoff(BackoffRef b) { std::lock_guard l(backoff_lock); - ceph_assert(b->lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(b->lock)); ceph_assert(b->session == this); auto i = backoffs.find(b->pgid); if (i != backoffs.end()) { diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index bb25b4487c1..4e13887e7ba 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -47,8 +47,7 @@ Notify::Notify( notify_id(notify_id), version(version), osd(osd), - cb(NULL), - lock("Notify::lock") {} + cb(NULL) {} NotifyRef Notify::makeNotifyRef( ConnectionRef client, @@ -75,27 +74,27 @@ class NotifyTimeoutCB : public CancelableContext { public: explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {} void finish(int) override { - notif->osd->watch_lock.Unlock(); - notif->lock.Lock(); + notif->osd->watch_lock.unlock(); + notif->lock.lock(); if (!canceled) notif->do_timeout(); // drops lock else - notif->lock.Unlock(); - notif->osd->watch_lock.Lock(); + notif->lock.unlock(); + notif->osd->watch_lock.lock(); } void cancel() override { - ceph_assert(notif->lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked(notif->lock)); canceled = true; } }; void Notify::do_timeout() { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked(lock)); dout(10) << "timeout" << dendl; cb = nullptr; if (is_discarded()) { - lock.Unlock(); + lock.unlock(); return; } @@ -104,7 +103,7 @@ void Notify::do_timeout() ceph_assert(complete); set _watchers; _watchers.swap(watchers); - lock.Unlock(); + lock.unlock(); for (set::iterator i = _watchers.begin(); i != _watchers.end(); @@ -120,28 +119,26 @@ void Notify::do_timeout() void Notify::register_cb() { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked(lock)); { - osd->watch_lock.Lock(); + std::lock_guard l{osd->watch_lock}; cb = new NotifyTimeoutCB(self.lock()); if (!osd->watch_timer.add_event_after(timeout, cb)) { cb = nullptr; } - osd->watch_lock.Unlock(); } } void Notify::unregister_cb() { - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked(lock)); if (!cb) return; cb->cancel(); { - osd->watch_lock.Lock(); + std::lock_guard l{osd->watch_lock}; osd->watch_timer.cancel_event(cb); cb = nullptr; - osd->watch_lock.Unlock(); } } @@ -245,14 +242,14 @@ public: OSDService *osd(watch->osd); ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl; boost::intrusive_ptr pg(watch->pg); - osd->watch_lock.Unlock(); + osd->watch_lock.unlock(); pg->lock(); watch->cb = nullptr; if (!watch->is_discarded() && !canceled) watch->pg->handle_watch_timeout(watch); delete this; // ~Watch requires pg lock! pg->unlock(); - osd->watch_lock.Lock(); + osd->watch_lock.lock(); } }; diff --git a/src/osd/Watch.h b/src/osd/Watch.h index 823a470b33c..3d3b09fd772 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -64,7 +64,7 @@ class Notify { OSDService *osd; CancelableContext *cb; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("Notify::lock"); /// (gid,cookie) -> reply_bl for everyone who acked the notify std::multimap, ceph::buffer::list> notify_replies; @@ -270,11 +270,11 @@ public: * Lives in the Session object of an OSD connection */ class WatchConState { - Mutex lock; + ceph::mutex lock = ceph::make_mutex("WatchConState"); std::set watches; public: CephContext* cct; - explicit WatchConState(CephContext* cct) : lock("WatchConState"), cct(cct) {} + explicit WatchConState(CephContext* cct) : cct(cct) {} /// Add a watch void addWatch( -- 2.39.5