]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 03:16:03 +0000 (11:16 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 03:27:18 +0000 (11:27 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OSDMapMapping.cc
src/osd/OSDMapMapping.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h
src/osd/Session.h
src/osd/Watch.cc
src/osd/Watch.h

index 6d4acc24a42d4d581c0537aaf901a91f170b8a28..1724ca213b7de4f2dd97efb10d275f110373c868 100644 (file)
@@ -232,54 +232,40 @@ OSDService::OSDService(OSD *osd) :
   publish_lock{ceph::make_mutex("OSDService::publish_lock")},
   pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
   max_oldest_map(0),
-  sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
+  scrubs_pending(0),
   scrubs_active(0),
-  agent_lock("OSDService::agent_lock"),
   agent_valid_iterator(false),
   agent_ops(0),
   flush_mode_high_count(0),
   agent_active(true),
   agent_thread(this),
   agent_stop_flag(false),
-  agent_timer_lock("OSDService::agent_timer_lock"),
   agent_timer(osd->client_messenger->cct, agent_timer_lock),
   last_recalibrate(ceph_clock_now()),
   promote_max_objects(0),
   promote_max_bytes(0),
   objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, NULL, 0, 0)),
   m_objecter_finishers(cct->_conf->osd_objecter_finishers),
-  watch_lock("OSDService::watch_lock"),
   watch_timer(osd->client_messenger->cct, watch_lock),
   next_notif_id(0),
-  recovery_request_lock("OSDService::recovery_request_lock"),
   recovery_request_timer(cct, recovery_request_lock, false),
-  sleep_lock("OSDService::sleep_lock"),
   sleep_timer(cct, sleep_lock, false),
   reserver_finisher(cct),
   local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                 cct->_conf->osd_min_recovery_priority),
   remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                  cct->_conf->osd_min_recovery_priority),
-  pg_temp_lock("OSDService::pg_temp_lock"),
   snap_reserver(cct, &reserver_finisher,
                cct->_conf->osd_max_trimming_pgs),
-  recovery_lock("OSDService::recovery_lock"),
   recovery_ops_active(0),
   recovery_ops_reserved(0),
   recovery_paused(false),
-  map_cache_lock("OSDService::map_cache_lock"),
   map_cache(cct, cct->_conf->osd_map_cache_size),
   map_bl_cache(cct->_conf->osd_map_cache_size),
   map_bl_inc_cache(cct->_conf->osd_map_cache_size),
-  stat_lock("OSDService::stat_lock"),
-  full_status_lock("OSDService::full_status_lock"),
   cur_state(NONE),
   cur_ratio(0), physical_ratio(0),
-  epoch_lock("OSDService::epoch_lock"),
   boot_epoch(0), up_epoch(0), bind_epoch(0)
-#ifdef PG_DEBUG_REFS
-  , pgid_lock("OSDService::pgid_lock")
-#endif
 {
   objecter->init();
 
@@ -506,12 +492,11 @@ void OSDService::final_init()
 void OSDService::activate_map()
 {
   // wake/unwake the tiering agent
-  agent_lock.Lock();
+  std::lock_guard l{agent_lock};
   agent_active =
     !osdmap->test_flag(CEPH_OSDMAP_NOTIERAGENT) &&
     osd->is_active();
-  agent_cond.Signal();
-  agent_lock.Unlock();
+  agent_cond.notify_all();
 }
 
 void OSDService::request_osdmap_update(epoch_t e)
@@ -531,12 +516,12 @@ public:
 void OSDService::agent_entry()
 {
   dout(10) << __func__ << " start" << dendl;
-  agent_lock.Lock();
+  std::unique_lock agent_locker{agent_lock};
 
   while (!agent_stop_flag) {
     if (agent_queue.empty()) {
       dout(20) << __func__ << " empty queue" << dendl;
-      agent_cond.Wait(agent_lock);
+      agent_cond.wait(agent_locker);
       continue;
     }
     uint64_t level = agent_queue.rbegin()->first;
@@ -555,7 +540,7 @@ void OSDService::agent_entry()
     if (!flush_mode_high_count)
       agent_flush_quota = cct->_conf->osd_agent_max_low_ops - agent_ops;
     if (agent_flush_quota <= 0 || top.empty() || !agent_active) {
-      agent_cond.Wait(agent_lock);
+      agent_cond.wait(agent_locker);
       continue;
     }
 
@@ -567,7 +552,7 @@ void OSDService::agent_entry()
     dout(10) << "high_count " << flush_mode_high_count
             << " agent_ops " << agent_ops
             << " flush_quota " << agent_flush_quota << dendl;
-    agent_lock.Unlock();
+    agent_locker.unlock();
     if (!pg->agent_work(max, agent_flush_quota)) {
       dout(10) << __func__ << " " << pg->pg_id
        << " no agent_work, delay for " << cct->_conf->osd_agent_delay_time
@@ -575,14 +560,12 @@ void OSDService::agent_entry()
 
       osd->logger->inc(l_osd_tier_delay);
       // Queue a timer to call agent_choose_mode for this pg in 5 seconds
-      agent_timer_lock.Lock();
+      std::lock_guard timer_locker{agent_timer_lock};
       Context *cb = new AgentTimeoutCB(pg);
       agent_timer.add_event_after(cct->_conf->osd_agent_delay_time, cb);
-      agent_timer_lock.Unlock();
     }
-    agent_lock.Lock();
+    agent_locker.lock();
   }
-  agent_lock.Unlock();
   dout(10) << __func__ << " finish" << dendl;
 }
 
@@ -601,7 +584,7 @@ void OSDService::agent_stop()
     }
 
     agent_stop_flag = true;
-    agent_cond.Signal();
+    agent_cond.notify_all();
   }
   agent_thread.join();
 }
@@ -1189,8 +1172,7 @@ bool OSDService::can_inc_scrubs_pending()
 bool OSDService::inc_scrubs_pending()
 {
   bool result = false;
-
-  sched_scrub_lock.Lock();
+  std::lock_guard l{sched_scrub_lock};
   if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
     dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
             << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
@@ -1199,24 +1181,21 @@ bool OSDService::inc_scrubs_pending()
   } else {
     dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
   }
-  sched_scrub_lock.Unlock();
-
   return result;
 }
 
 void OSDService::dec_scrubs_pending()
 {
-  sched_scrub_lock.Lock();
+  std::lock_guard l{sched_scrub_lock};
   dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
           << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
   --scrubs_pending;
   ceph_assert(scrubs_pending >= 0);
-  sched_scrub_lock.Unlock();
 }
 
 void OSDService::inc_scrubs_active(bool reserved)
 {
-  sched_scrub_lock.Lock();
+  std::lock_guard l{sched_scrub_lock};
   ++(scrubs_active);
   if (reserved) {
     --(scrubs_pending);
@@ -1229,17 +1208,15 @@ void OSDService::inc_scrubs_active(bool reserved)
             << " (max " << cct->_conf->osd_max_scrubs
             << ", pending " << scrubs_pending << ")" << dendl;
   }
-  sched_scrub_lock.Unlock();
 }
 
 void OSDService::dec_scrubs_active()
 {
-  sched_scrub_lock.Lock();
+  std::lock_guard l{sched_scrub_lock};
   dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
           << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
   --scrubs_active;
   ceph_assert(scrubs_active >= 0);
-  sched_scrub_lock.Unlock();
 }
 
 void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
@@ -1842,7 +1819,7 @@ void OSDService::_queue_for_recovery(
   std::pair<epoch_t, PGRef> p,
   uint64_t reserved_pushes)
 {
-  ceph_assert(recovery_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
   enqueue_back(
     OpQueueItem(
       unique_ptr<OpQueueItem::OpQueueable>(
@@ -2069,9 +2046,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
         MonClient *mc,
         const std::string &dev, const std::string &jdev) :
   Dispatcher(cct_),
-  osd_lock("OSD::osd_lock"),
   tick_timer(cct, osd_lock),
-  tick_timer_lock("OSD::tick_timer_lock"),
   tick_timer_without_osd_lock(cct, tick_timer_lock),
   gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
   cluster_messenger(internal_messenger),
@@ -2095,9 +2070,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
            get_num_op_threads()),
   command_tp(cct, "OSD::command_tp", "tp_osd_cmd",  1),
-  session_waiting_lock("OSD::session_waiting_lock"),
-  osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"),
-  heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false),
   heartbeat_need_update(true),
   hb_front_client_messenger(hb_client_front),
@@ -2117,9 +2089,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_op_thread_timeout,
     cct->_conf->osd_op_thread_suicide_timeout,
     &osd_op_tp),
-  map_lock("OSD::map_lock"),
   last_pg_create_epoch(0),
-  mon_report_lock("OSD::mon_report_lock"),
   boot_finisher(cct),
   up_thru_wanted(0),
   requested_full_first(0),
@@ -3051,7 +3021,7 @@ int OSD::init()
                                                new C_Tick_WithoutOSDLock(this));
   }
 
-  osd_lock.Unlock();
+  osd_lock.unlock();
 
   r = monc->authenticate();
   if (r < 0) {
@@ -3083,7 +3053,7 @@ int OSD::init()
     exit(1);
   }
 
-  osd_lock.Lock();
+  osd_lock.lock();
   if (is_stopping())
     return 0;
 
@@ -3391,9 +3361,9 @@ int OSD::shutdown()
 {
   if (!service.prepare_to_stop())
     return 0; // already shutting down
-  osd_lock.Lock();
+  osd_lock.lock();
   if (is_stopping()) {
-    osd_lock.Unlock();
+    osd_lock.unlock();
     return 0;
   }
   dout(0) << "shutdown" << dendl;
@@ -3444,12 +3414,13 @@ int OSD::shutdown()
   delete test_ops_hook;
   test_ops_hook = NULL;
 
-  osd_lock.Unlock();
+  osd_lock.unlock();
 
-  heartbeat_lock.Lock();
-  heartbeat_stop = true;
-  heartbeat_cond.Signal();
-  heartbeat_lock.Unlock();
+  {
+    std::lock_guard l{heartbeat_lock};
+    heartbeat_stop = true;
+    heartbeat_cond.notify_all();
+  }
   heartbeat_thread.join();
 
   osd_op_tp.drain();
@@ -3465,7 +3436,7 @@ int OSD::shutdown()
 
   boot_finisher.wait_for_empty();
 
-  osd_lock.Lock();
+  osd_lock.lock();
 
   boot_finisher.stop();
   reset_heartbeat_peers(true);
@@ -3526,9 +3497,9 @@ int OSD::shutdown()
   service.dump_live_pgids();
 #endif
 
-  osd_lock.Unlock();
+  osd_lock.unlock();
   cct->_conf.remove_observer(this);
-  osd_lock.Lock();
+  osd_lock.lock();
 
   service.meta_ch.reset();
 
@@ -3541,12 +3512,11 @@ int OSD::shutdown()
   }
 
   monc->shutdown();
-  osd_lock.Unlock();
-
-  map_lock.get_write();
-  osdmap = OSDMapRef();
-  map_lock.put_write();
-
+  osd_lock.unlock();
+  {
+    std::unique_lock l{map_lock};
+    osdmap = OSDMapRef();
+  }
   for (auto s : shards) {
     std::lock_guard l(s->osdmap_lock);
     s->shard_osdmap = OSDMapRef();
@@ -3970,7 +3940,7 @@ PGRef OSD::lookup_lock_pg(spg_t pgid)
 
 void OSD::load_pgs()
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   dout(0) << "load_pgs" << dendl;
 
   {
@@ -4159,7 +4129,7 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
   pg->init_collection_pool_opts();
 
   if (pg->is_primary()) {
-    Mutex::Locker locker(m_perf_queries_lock);
+    std::lock_guard locker{m_perf_queries_lock};
     pg->set_dynamic_perf_stats_queries(m_perf_queries);
   }
 
@@ -4416,7 +4386,7 @@ void OSD::need_heartbeat_peer_update()
 
 void OSD::maybe_update_heartbeat_peers()
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
 
   if (is_waiting_for_healthy() || is_active()) {
     utime_t now = ceph_clock_now();
@@ -4523,7 +4493,7 @@ void OSD::maybe_update_heartbeat_peers()
 
 void OSD::reset_heartbeat_peers(bool all)
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   dout(10) << "reset_heartbeat_peers" << dendl;
   utime_t stale = ceph_clock_now();
   stale -= cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
@@ -4555,16 +4525,16 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
   int from = m->get_source().num();
 
-  heartbeat_lock.Lock();
+  heartbeat_lock.lock();
   if (is_stopping()) {
-    heartbeat_lock.Unlock();
+    heartbeat_lock.unlock();
     m->put();
     return;
   }
 
   OSDMapRef curmap = service.get_osdmap();
   if (!curmap) {
-    heartbeat_lock.Unlock();
+    heartbeat_lock.unlock();
     m->put();
     return;
   }
@@ -4720,23 +4690,22 @@ void OSD::handle_osd_ping(MOSDPing *m)
     break;
   }
 
-  heartbeat_lock.Unlock();
+  heartbeat_lock.unlock();
   m->put();
 }
 
 void OSD::heartbeat_entry()
 {
-  std::lock_guard l(heartbeat_lock);
+  std::unique_lock l(heartbeat_lock);
   if (is_stopping())
     return;
   while (!heartbeat_stop) {
     heartbeat();
 
     double wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
-    utime_t w;
-    w.set_from_double(wait);
+    auto w = ceph::make_timespan(wait);
     dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
-    heartbeat_cond.WaitInterval(heartbeat_lock, w);
+    heartbeat_cond.wait_for(l, w);
     if (is_stopping())
       return;
     dout(30) << "heartbeat_entry woke up" << dendl;
@@ -4745,7 +4714,7 @@ void OSD::heartbeat_entry()
 
 void OSD::heartbeat_check()
 {
-  ceph_assert(heartbeat_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(heartbeat_lock));
   utime_t now = ceph_clock_now();
 
   // check for incoming heartbeats (move me elsewhere?)
@@ -4794,7 +4763,7 @@ void OSD::heartbeat_check()
 
 void OSD::heartbeat()
 {
-  ceph_assert(heartbeat_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock));
   dout(30) << "heartbeat" << dendl;
 
   // get CPU load avg
@@ -4924,7 +4893,7 @@ bool OSD::heartbeat_reset(Connection *con)
 
 void OSD::tick()
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   dout(10) << "tick" << dendl;
 
   if (is_active() || is_waiting_for_healthy()) {
@@ -4975,7 +4944,7 @@ void OSD::tick()
 
 void OSD::tick_without_osd_lock()
 {
-  ceph_assert(tick_timer_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(tick_timer_lock));
   dout(10) << "tick_without_osd_lock" << dendl;
 
   logger->set(l_osd_cached_crc, buffer::get_cached_crc());
@@ -4992,11 +4961,11 @@ void OSD::tick_without_osd_lock()
   // osd_lock is not being held, which means the OSD state
   // might change when doing the monitor report
   if (is_active() || is_waiting_for_healthy()) {
-    heartbeat_lock.Lock();
-    heartbeat_check();
-    heartbeat_lock.Unlock();
-
-    map_lock.get_read();
+    {
+      std::lock_guard l{heartbeat_lock};
+      heartbeat_check();
+    }
+    map_lock.lock_shared();
     std::lock_guard l(mon_report_lock);
 
     // mon report?
@@ -5007,7 +4976,7 @@ void OSD::tick_without_osd_lock()
       send_full_update();
       send_failures();
     }
-    map_lock.put_read();
+    map_lock.unlock_shared();
 
     epoch_t max_waiting_epoch = 0;
     for (auto s : shards) {
@@ -5303,7 +5272,7 @@ void OSD::ms_handle_connect(Connection *con)
     } else if (is_booting()) {
       _send_boot();       // resend boot message
     } else {
-      map_lock.get_read();
+      map_lock.lock_shared();
       std::lock_guard l2(mon_report_lock);
 
       utime_t now = ceph_clock_now();
@@ -5320,7 +5289,7 @@ void OSD::ms_handle_connect(Connection *con)
       requeue_failures();
       send_failures();
 
-      map_lock.put_read();
+      map_lock.unlock_shared();
       if (is_active()) {
        send_beacon(ceph::coarse_mono_clock::now());
       }
@@ -5506,15 +5475,15 @@ void OSD::_preboot(epoch_t oldest, epoch_t newest)
     boot_finisher.queue(
       new FunctionContext(
        [this](int r) {
-         std::lock_guard l(osd_lock);
+         std::unique_lock l(osd_lock);
          if (is_preboot()) {
            dout(10) << __func__ << " waiting for peering work to drain"
                     << dendl;
-           osd_lock.Unlock();
+           l.unlock();
            for (auto shard : shards) {
              shard->wait_min_pg_epoch(osdmap->get_epoch());
            }
-           osd_lock.Lock();
+           l.lock();
          }
          if (is_preboot()) {
            _send_boot();
@@ -5788,9 +5757,9 @@ void OSD::_collect_metadata(map<string,string> *pm)
 
 void OSD::queue_want_up_thru(epoch_t want)
 {
-  map_lock.get_read();
+  std::shared_lock map_locker{map_lock};
   epoch_t cur = osdmap->get_up_thru(whoami);
-  std::lock_guard l(mon_report_lock);
+  std::lock_guard report_locker(mon_report_lock);
   if (want > up_thru_wanted) {
     dout(10) << "queue_want_up_thru now " << want << " (was " << up_thru_wanted << ")"
             << ", currently " << cur
@@ -5802,12 +5771,11 @@ void OSD::queue_want_up_thru(epoch_t want)
             << ", currently " << cur
             << dendl;
   }
-  map_lock.put_read();
 }
 
 void OSD::send_alive()
 {
-  ceph_assert(mon_report_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(mon_report_lock));
   if (!osdmap->exists(whoami))
     return;
   epoch_t up_thru = osdmap->get_up_thru(whoami);
@@ -5823,7 +5791,7 @@ void OSD::request_full_map(epoch_t first, epoch_t last)
   dout(10) << __func__ << " " << first << ".." << last
           << ", previously requested "
           << requested_full_first << ".." << requested_full_last << dendl;
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   ceph_assert(first > 0 && last > 0);
   ceph_assert(first <= last);
   ceph_assert(first >= requested_full_first);  // we shouldn't ever ask for older maps
@@ -5847,7 +5815,7 @@ void OSD::request_full_map(epoch_t first, epoch_t last)
 void OSD::got_full_map(epoch_t e)
 {
   ceph_assert(requested_full_first <= requested_full_last);
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   if (requested_full_first == 0) {
     dout(20) << __func__ << " " << e << ", nothing requested" << dendl;
     return;
@@ -5887,8 +5855,8 @@ void OSD::requeue_failures()
 
 void OSD::send_failures()
 {
-  ceph_assert(map_lock.is_locked());
-  ceph_assert(mon_report_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(map_lock));
+  ceph_assert(ceph_mutex_is_locked(mon_report_lock));
   std::lock_guard l(heartbeat_lock);
   utime_t now = ceph_clock_now();
   while (!failure_queue.empty()) {
@@ -6155,9 +6123,9 @@ void OSD::do_command(
 
 namespace {
   class unlock_guard {
-    Mutex& m;
+    ceph::mutex& m;
   public:
-    explicit unlock_guard(Mutex& mutex)
+    explicit unlock_guard(ceph::mutex& mutex)
       : m(mutex)
     {
       m.unlock();
@@ -6692,7 +6660,7 @@ void OSD::scrub_purged_snaps()
     pg->queue_snap_retrim(snap);
     pg->unlock();
   }
-  osd_lock.Lock();
+  osd_lock.lock();
   if (is_stopping()) {
     return;
   }
@@ -6779,9 +6747,9 @@ bool OSD::ms_dispatch(Message *m)
 
   // lock!
 
-  osd_lock.Lock();
+  osd_lock.lock();
   if (is_stopping()) {
-    osd_lock.Unlock();
+    osd_lock.unlock();
     m->put();
     return true;
   }
@@ -6789,7 +6757,7 @@ bool OSD::ms_dispatch(Message *m)
   do_waiters();
   _dispatch(m);
 
-  osd_lock.Unlock();
+  osd_lock.unlock();
 
   return true;
 }
@@ -6840,7 +6808,7 @@ void OSDService::maybe_share_map(
 
 void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
 {
-  ceph_assert(session->session_dispatch_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(session->session_dispatch_lock));
 
   auto i = session->waiting_on_map.begin();
   while (i != session->waiting_on_map.end()) {
@@ -7020,7 +6988,7 @@ int OSD::ms_handle_authentication(Connection *con)
 
 void OSD::do_waiters()
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
 
   dout(10) << "do_waiters -- start" << dendl;
   while (!finished.empty()) {
@@ -7043,7 +7011,7 @@ void OSD::dispatch_op(OpRequestRef op)
 
 void OSD::_dispatch(Message *m)
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   dout(20) << "_dispatch " << m << " " << *m << dendl;
 
   switch (m->get_type()) {
@@ -7391,7 +7359,7 @@ MPGStats* OSD::collect_pg_stats()
   // stats every time we're called.  This has equivalent cost to the
   // previous implementation's worst case where all PGs are busy and
   // their stats are always enqueued for sending.
-  RWLock::RLocker l(map_lock);
+  std::shared_lock l{map_lock};
 
   osd_stat_t cur_stat = service.get_osd_stat();
   cur_stat.os_perf_stat = store->get_cur_stats();
@@ -7508,10 +7476,10 @@ void OSD::wait_for_new_map(OpRequestRef op)
 
 void OSD::note_down_osd(int peer)
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   cluster_messenger->mark_down_addrs(osdmap->get_cluster_addrs(peer));
 
-  heartbeat_lock.Lock();
+  std::lock_guard l{heartbeat_lock};
   failure_queue.erase(peer);
   failure_pending.erase(peer);
   map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
@@ -7522,7 +7490,6 @@ void OSD::note_down_osd(int peer)
     }
     heartbeat_peers.erase(p);
   }
-  heartbeat_lock.Unlock();
 }
 
 void OSD::note_up_osd(int peer)
@@ -7635,7 +7602,7 @@ void OSD::handle_osd_map(MOSDMap *m)
     }
   }
 
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   map<epoch_t,OSDMapRef> added_maps;
   map<epoch_t,bufferlist> added_maps_bl;
   if (m->fsid != monc->get_fsid()) {
@@ -7923,7 +7890,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     dout(10) << __func__ << " bailing, we are shutting down" << dendl;
     return;
   }
-  map_lock.get_write();
+  map_lock.lock();
 
   bool do_shutdown = false;
   bool do_restart = false;
@@ -8128,7 +8095,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
     }
   }
 
-  map_lock.put_write();
+  map_lock.unlock();
 
   check_osdmap_features();
 
@@ -8480,7 +8447,7 @@ bool OSD::advance_pg(
 
 void OSD::consume_map()
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
   dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
 
   /** make sure the cluster is speaking in SORTBITWISE, because we don't
@@ -8597,7 +8564,7 @@ void OSD::consume_map()
 
 void OSD::activate_map()
 {
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
 
   dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
 
@@ -8693,12 +8660,12 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map,
     auto priv = con->get_priv();
     if (auto s = static_cast<Session*>(priv.get()); s) {
       if (!is_fast_dispatch)
-       s->session_dispatch_lock.Lock();
+       s->session_dispatch_lock.lock();
       clear_session_waiting_on_map(s);
       con->set_priv(nullptr);   // break ref <-> session cycle, if any
       s->con.reset();
       if (!is_fast_dispatch)
-       s->session_dispatch_lock.Unlock();
+       s->session_dispatch_lock.unlock();
     }
     return false;
   }
@@ -8717,7 +8684,7 @@ bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch,
   dout(15) << "require_same_or_newer_map " << epoch
           << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
 
-  ceph_assert(osd_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(osd_lock));
 
   // do they have a newer map?
   if (epoch > osdmap->get_epoch()) {
@@ -9298,7 +9265,7 @@ void OSD::handle_pg_query_nopg(const MQuery& q)
 // RECOVERY
 
 void OSDService::_maybe_queue_recovery() {
-  ceph_assert(recovery_lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
   uint64_t available_pushes;
   while (!awaiting_throttle.empty() &&
         _recover_now(&available_pushes)) {
@@ -9373,12 +9340,14 @@ void OSD::do_recovery(
       // This is true for the first recovery op and when the previous recovery op
       // has been scheduled in the past. The next recovery op is scheduled after
       // completing the sleep from now.
-      if (service.recovery_schedule_time < ceph_clock_now()) {
-        service.recovery_schedule_time = ceph_clock_now();
+      
+      if (auto now = ceph::real_clock::now();
+         service.recovery_schedule_time < now) {
+        service.recovery_schedule_time = now;
       }
-      service.recovery_schedule_time += recovery_sleep;
+      service.recovery_schedule_time += ceph::make_timespan(recovery_sleep);
       service.sleep_timer.add_event_at(service.recovery_schedule_time,
-                                               recovery_requeue_callback);
+                                      recovery_requeue_callback);
       dout(20) << "Recovery event scheduled at "
                << service.recovery_schedule_time << dendl;
       return;
@@ -9679,7 +9648,7 @@ const char** OSD::get_tracked_conf_keys() const
 void OSD::handle_conf_change(const ConfigProxy& conf,
                             const std::set <std::string> &changed)
 {
-  Mutex::Locker l(osd_lock);
+  std::lock_guard l{osd_lock};
   if (changed.count("osd_max_backfills")) {
     service.local_reserver.set_max(cct->_conf->osd_max_backfills);
     service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
@@ -10023,13 +9992,11 @@ void OSD::set_perf_queries(
     dout(1) << queries.size() - supported_queries.size()
             << " unsupported queries" << dendl;
   }
-
   {
-    Mutex::Locker locker(m_perf_queries_lock);
+    std::lock_guard locker{m_perf_queries_lock};
     m_perf_queries = supported_queries;
     m_perf_limits = queries;
   }
-
   std::vector<PGRef> pgs;
   _get_pgs(&pgs);
   for (auto& pg : pgs) {
index 95e772bd8e53a0354c38aafd6fdf92d3475995cd..67ec62159d553c1f1e766bf6e2fb9b41770bcf64 100644 (file)
@@ -19,8 +19,6 @@
 
 #include "msg/Dispatcher.h"
 
-#include "common/Mutex.h"
-#include "common/RWLock.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "common/AsyncReserver.h"
@@ -273,7 +271,7 @@ public:
 
 private:
   // -- scrub scheduling --
-  Mutex sched_scrub_lock;
+  ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
   int scrubs_pending;
   int scrubs_active;
 
@@ -363,8 +361,8 @@ public:
 
 private:
   // -- agent shared state --
-  Mutex agent_lock;
-  Cond agent_cond;
+  ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
+  ceph::condition_variable agent_cond;
   map<uint64_t, set<PGRef> > agent_queue;
   set<PGRef>::iterator agent_queue_pos;
   bool agent_valid_iterator;
@@ -381,7 +379,7 @@ private:
     }
   } agent_thread;
   bool agent_stop_flag;
-  Mutex agent_timer_lock;
+  ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
   SafeTimer agent_timer;
 
 public:
@@ -394,7 +392,7 @@ public:
       agent_valid_iterator = false;  // inserting higher-priority queue
     set<PGRef>& nq = agent_queue[priority];
     if (nq.empty())
-      agent_cond.Signal();
+      agent_cond.notify_all();
     nq.insert(pg);
   }
 
@@ -443,7 +441,7 @@ public:
     std::lock_guard l(agent_lock);
     ceph_assert(agent_ops > 0);
     --agent_ops;
-    agent_cond.Signal();
+    agent_cond.notify_all();
   }
 
   /// note start of an async (flush) op
@@ -461,7 +459,7 @@ public:
     --agent_ops;
     ceph_assert(agent_oids.count(oid) == 1);
     agent_oids.erase(oid);
-    agent_cond.Signal();
+    agent_cond.notify_all();
   }
 
   /// check if we are operating on an object
@@ -518,7 +516,7 @@ public:
   vector<Finisher*> objecter_finishers;
 
   // -- Watch --
-  Mutex watch_lock;
+  ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
   SafeTimer watch_timer;
   uint64_t next_notif_id;
   uint64_t get_next_id(epoch_t cur_epoch) {
@@ -527,15 +525,15 @@ public:
   }
 
   // -- Recovery/Backfill Request Scheduling --
-  Mutex recovery_request_lock;
+  ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
   SafeTimer recovery_request_timer;
 
   // For async recovery sleep
   bool recovery_needs_sleep = true;
-  utime_t recovery_schedule_time = utime_t();
+  ceph::real_clock::time_point recovery_schedule_time;
 
   // For recovery & scrub & snap
-  Mutex sleep_lock;
+  ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
   SafeTimer sleep_timer;
 
   // -- tids --
@@ -551,7 +549,7 @@ public:
   AsyncReserver<spg_t> remote_reserver;
 
   // -- pg merge --
-  Mutex merge_lock = {"OSD::merge_lock"};
+  ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
   map<pg_t,eversion_t> ready_to_merge_source;   // pg -> version
   map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target;  // pg -> (version,les,lec)
   set<pg_t> not_ready_to_merge_source;
@@ -574,7 +572,7 @@ public:
 
   // -- pg_temp --
 private:
-  Mutex pg_temp_lock;
+  ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
   struct pg_temp_t {
     vector<int> acting;
     bool forced = false;
@@ -605,7 +603,7 @@ public:
 
 private:
   // -- pg recovery and associated throttling --
-  Mutex recovery_lock;
+  ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
   list<pair<epoch_t, PGRef> > awaiting_throttle;
 
   utime_t defer_recovery_until;
@@ -669,7 +667,7 @@ public:
   }
 
   // osd map cache (past osd maps)
-  Mutex map_cache_lock;
+  ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
   SharedLRU<epoch_t, const OSDMap> map_cache;
   SimpleLRU<epoch_t, bufferlist> map_bl_cache;
   SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
@@ -741,7 +739,7 @@ public:
   void shutdown();
 
   // -- stats --
-  Mutex stat_lock;
+  ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
   osd_stat_t osd_stat;
   uint32_t seq = 0;
 
@@ -765,7 +763,7 @@ public:
   // -- OSD Full Status --
 private:
   friend TestOpsSocketHook;
-  mutable Mutex full_status_lock;
+  mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
   enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state;  // ascending
   const char *get_full_state_name(s_names s) const {
     switch (s) {
@@ -816,7 +814,8 @@ public:
 
   // -- epochs --
 private:
-  mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+  // protects access to boot_epoch, up_epoch, bind_epoch
+  mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
   epoch_t boot_epoch;  // _first_ epoch we were marked up (after this process started)
   epoch_t up_epoch;    // _most_recent_ epoch we were marked up
   epoch_t bind_epoch;  // epoch we last did a bind to new ip:ports
@@ -875,7 +874,7 @@ public:
 
 
 #ifdef PG_DEBUG_REFS
-  Mutex pgid_lock;
+  ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
   map<spg_t, int> pgid_tracker;
   map<spg_t, PG*> live_pgs;
   void add_pgid(spg_t pgid, PG *pg);
@@ -1100,11 +1099,12 @@ struct OSDShard {
 class OSD : public Dispatcher,
            public md_config_obs_t {
   /** OSD **/
-  Mutex osd_lock;          // global lock
+  // global lock
+  ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
   SafeTimer tick_timer;    // safe timer (osd_lock)
 
   // Tick timer for those stuff that do not need osd_lock
-  Mutex tick_timer_lock;
+  ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
   SafeTimer tick_timer_without_osd_lock;
   std::string gss_ktfile_client{};
 
@@ -1329,7 +1329,7 @@ private:
 private:
   void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
 
-  Mutex session_waiting_lock;
+  ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
   set<SessionRef> session_waiting_for_map;
 
   /// Caller assumes refs for included Sessions
@@ -1388,7 +1388,7 @@ private:
   void osdmap_subscribe(version_t epoch, bool force_request);
   /** @} monc helpers */
 
-  Mutex osdmap_subscribe_lock;
+  ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
   epoch_t latest_subscribed_epoch{0};
 
   // -- heartbeat --
@@ -1441,9 +1441,9 @@ private:
     int peer;
     explicit HeartbeatSession(int p) : peer(p) {}
   };
-  Mutex heartbeat_lock;
+  ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
   map<int, int> debug_heartbeat_drops_remaining;
-  Cond heartbeat_cond;
+  ceph::condition_variable heartbeat_cond;
   bool heartbeat_stop;
   std::atomic<bool> heartbeat_need_update;   
   map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
@@ -1476,7 +1476,7 @@ private:
 
   void heartbeat_kick() {
     std::lock_guard l(heartbeat_lock);
-    heartbeat_cond.Signal();
+    heartbeat_cond.notify_all();
   }
 
   struct T_Heartbeat : public Thread {
@@ -1528,7 +1528,7 @@ private:
   list<OpRequestRef> finished;
   
   void take_waiters(list<OpRequestRef>& ls) {
-    ceph_assert(osd_lock.is_locked());
+    ceph_assert(ceph_mutex_is_locked(osd_lock));
     finished.splice(finished.end(), ls);
   }
   void do_waiters();
@@ -1697,7 +1697,7 @@ protected:
 
   pool_pg_num_history_t pg_num_history;
 
-  RWLock          map_lock;
+  ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
   list<OpRequestRef>  waiting_for_osdmap;
   deque<utime_t> osd_markdown_log;
 
@@ -1752,7 +1752,7 @@ public:
   }
 
 protected:
-  Mutex merge_lock = {"OSD::merge_lock"};
+  ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
   /// merge epoch -> target pgid -> source pgid -> pg
   map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
 
@@ -1812,7 +1812,7 @@ protected:
   void _finish_splits(set<PGRef>& pgs);
 
   // == monitor interaction ==
-  Mutex mon_report_lock;
+  ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
   utime_t last_mon_report;
   Finisher boot_finisher;
 
@@ -1861,7 +1861,7 @@ protected:
   void cancel_pending_failures();
 
   ceph::coarse_mono_clock::time_point last_sent_beacon;
-  Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+  ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
   epoch_t min_last_epoch_clean = 0;
   // which pgs were scanned for min_lec
   std::vector<pg_t> min_last_epoch_clean_pgs;
@@ -2178,7 +2178,7 @@ private:
   void get_perf_reports(
       std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
 
-  Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"};
+  ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
   std::list<OSDPerfMetricQuery> m_perf_queries;
   std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
 };
index 42c8d997d31d9119522ac19cf5c7703081c07760..ba59c21dfbed3c453aa3187ad5783793d7920307 100644 (file)
@@ -135,7 +135,7 @@ void ParallelPGMapper::Job::finish_one()
        finish = ceph_clock_now();
        complete();
       }
-      cond.Signal();
+      cond.notify_all();
       fin = onfinish;
       onfinish = nullptr;
     }
index 5435ef7b7ac101bbed71bedfc36eb4937f7a055f..37ec74f6f50b55cf8849291a1d51ba9e8b138e7e 100644 (file)
@@ -24,8 +24,8 @@ public:
     bool aborted = false;
     Context *onfinish = nullptr;
 
-    Mutex lock = {"ParallelPGMapper::Job::lock"};
-    Cond cond;
+    ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock");
+    ceph::condition_variable cond;
 
     Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
     virtual ~Job() {
@@ -38,15 +38,15 @@ public:
     virtual void complete() = 0;
 
     void set_finish_event(Context *fin) {
-      lock.Lock();
+      lock.lock();
       if (shards == 0) {
        // already done.
-       lock.Unlock();
+       lock.unlock();
        fin->complete(0);
       } else {
        // set finisher
        onfinish = fin;
-       lock.Unlock();
+       lock.unlock();
       }
     }
     bool is_done() {
@@ -57,33 +57,29 @@ public:
       return finish - start;
     }
     void wait() {
-      std::lock_guard l(lock);
-      while (shards > 0) {
-       cond.Wait(lock);
-      }
+      std::unique_lock l(lock);
+      cond.wait(l, [this] { return shards == 0; });
     }
     bool wait_for(double duration) {
       utime_t until = start;
       until += duration;
-      std::lock_guard l(lock);
+      std::unique_lock l(lock);
       while (shards > 0) {
        if (ceph_clock_now() >= until) {
          return false;
        }
-       cond.Wait(lock);
+       cond.wait(l);
       }
       return true;
     }
     void abort() {
       Context *fin = nullptr;
       {
-       std::lock_guard l(lock);
+       std::unique_lock l(lock);
        aborted = true;
        fin = onfinish;
        onfinish = nullptr;
-       while (shards > 0) {
-         cond.Wait(lock);
-       }
+       cond.wait(l, [this] { return shards == 0; });
       }
       if (fin) {
        fin->complete(-ECANCELED);
index ac8ec080e39f0371f5c4fd505881f357227561a6..64c99990a7ac9276ffec7664dc22de4412c20753 100644 (file)
@@ -191,12 +191,9 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   scrub_queued(false),
   recovery_queued(false),
   recovery_ops_active(0),
-  heartbeat_peer_lock("PG::heartbeat_peer_lock"),
   backfill_reserving(false),
-  pg_stats_publish_lock("PG::pg_stats_publish_lock"),
   pg_stats_publish_valid(false),
   finish_sync_event(NULL),
-  backoff_lock("PG::backoff_lock"),
   scrub_after_recovery(false),
   active_pushes(0),
   recovery_state(
@@ -229,7 +226,7 @@ PG::~PG()
 
 void PG::lock(bool no_lockdep) const
 {
-  _lock.Lock(no_lockdep);
+  _lock.lock(no_lockdep);
   // if we have unrecorded dirty state with the lock dropped, there is a bug
   ceph_assert(!recovery_state.debug_has_dirty_state());
 
@@ -707,7 +704,7 @@ void PG::rm_backoff(BackoffRef b)
 {
   dout(10) << __func__ << " " << *b << dendl;
   std::lock_guard l(backoff_lock);
-  ceph_assert(b->lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
   ceph_assert(b->pg == this);
   auto p = backoffs.find(b->begin);
   // may race with release_backoffs()
@@ -783,7 +780,7 @@ void PG::clear_probe_targets()
 void PG::update_heartbeat_peers(set<int> new_peers)
 {
   bool need_update = false;
-  heartbeat_peer_lock.Lock();
+  heartbeat_peer_lock.lock();
   if (new_peers == heartbeat_peers) {
     dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
   } else {
@@ -791,7 +788,7 @@ void PG::update_heartbeat_peers(set<int> new_peers)
     heartbeat_peers.swap(new_peers);
     need_update = true;
   }
-  heartbeat_peer_lock.Unlock();
+  heartbeat_peer_lock.unlock();
 
   if (need_update)
     osd->need_heartbeat_peer_update();
@@ -815,8 +812,7 @@ void PG::publish_stats_to_osd()
   if (!is_primary())
     return;
 
-  pg_stats_publish_lock.Lock();
-
+  std::lock_guard l{pg_stats_publish_lock};
   auto stats = recovery_state.prepare_stats_for_publish(
     pg_stats_publish_valid,
     pg_stats_publish,
@@ -825,16 +821,13 @@ void PG::publish_stats_to_osd()
     pg_stats_publish = stats.value();
     pg_stats_publish_valid = true;
   }
-
-  pg_stats_publish_lock.Unlock();
 }
 
 void PG::clear_publish_stats()
 {
   dout(15) << "clear_stats" << dendl;
-  pg_stats_publish_lock.Lock();
+  std::lock_guard l{pg_stats_publish_lock};
   pg_stats_publish_valid = false;
-  pg_stats_publish_lock.Unlock();
 }
 
 /**
@@ -1991,7 +1984,7 @@ bool PG::try_reserve_recovery_space(
 
   // This lock protects not only the stats OSDService but also setting the
   // pg primary_bytes.  That's why we don't immediately unlock
-  Mutex::Locker l(osd->stat_lock);
+  std::lock_guard l{osd->stat_lock};
   osd_stat_t cur_stat = osd->osd_stat;
   if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
       (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
@@ -2180,21 +2173,19 @@ void PG::_scan_snaps(ScrubMap &smap)
 
        // wait for repair to apply to avoid confusing other bits of the system.
        {
-         Cond my_cond;
-         Mutex my_lock("PG::_scan_snaps my_lock");
+         ceph::condition_variable my_cond;
+         ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
          int r = 0;
          bool done;
          t.register_on_applied_sync(
-           new C_SafeCond(&my_lock, &my_cond, &done, &r));
+           new C_SafeCond(my_lock, my_cond, &done, &r));
          r = osd->store->queue_transaction(ch, std::move(t));
          if (r != 0) {
            derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
                 << dendl;
          } else {
-           my_lock.Lock();
-           while (!done)
-             my_cond.Wait(my_lock);
-           my_lock.Unlock();
+           std::unique_lock l{my_lock};
+           my_cond.wait(l, [&done] { return done;});
          }
        }
       }
@@ -3732,11 +3723,11 @@ void PG::do_delete_work(ObjectStore::Transaction &t)
         unlock();
       });
 
-      utime_t delete_schedule_time = ceph_clock_now();
-      delete_schedule_time += osd_delete_sleep;
-      Mutex::Locker l(osd->sleep_lock);
+      auto delete_schedule_time = ceph::real_clock::now();
+      delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
+      std::lock_guard l{osd->sleep_lock};
       osd->sleep_timer.add_event_at(delete_schedule_time,
-                                               delete_requeue_callback);
+                                   delete_requeue_callback);
       dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
       return;
     }
@@ -3883,23 +3874,21 @@ void PG::dump_missing(Formatter *f)
 
 void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
 {
-  pg_stats_publish_lock.Lock();
+  std::lock_guard l{pg_stats_publish_lock};
   if (pg_stats_publish_valid) {
     f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
   }
-  pg_stats_publish_lock.Unlock();
 }
 
 void PG::with_heartbeat_peers(std::function<void(int)> f)
 {
-  heartbeat_peer_lock.Lock();
+  std::lock_guard l{heartbeat_peer_lock};
   for (auto p : heartbeat_peers) {
     f(p);
   }
   for (auto p : probe_targets) {
     f(p);
   }
-  heartbeat_peer_lock.Unlock();
 }
 
 uint64_t PG::get_min_alloc_size() const {
index e393b7b20edec148937384f806b6765856c6d6be..beffac1a8a11cd10cb229b36bac5c37e121ae5f7 100644 (file)
@@ -98,10 +98,10 @@ class PGRecoveryStats {
     per_state_info() : enter(0), exit(0), events(0) {}
   };
   map<const char *,per_state_info> info;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("PGRecoverStats::lock");
 
   public:
-  PGRecoveryStats() : lock("PGRecoverStats::lock") {}
+  PGRecoveryStats() = default;
 
   void reset() {
     std::lock_guard l(lock);
@@ -220,7 +220,7 @@ public:
   void unlock() const {
     //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
     ceph_assert(!recovery_state.debug_has_dirty_state());
-    _lock.Unlock();
+    _lock.unlock();
   }
   bool is_locked() const {
     return _lock.is_locked();
@@ -607,12 +607,12 @@ protected:
   // get() should be called on pointer copy (to another thread, etc.).
   // put() should be called on destruction of some previously copied pointer.
   // unlock() when done with the current pointer (_most common_).
-  mutable Mutex _lock = {"PG::_lock"};
+  mutable ceph::mutex _lock = ceph::make_mutex("PG::_lock");
 
   std::atomic<unsigned int> ref{0};
 
 #ifdef PG_DEBUG_REFS
-  Mutex _ref_id_lock = {"PG::_ref_id_lock"};
+  ceph::mutex _ref_id_lock = ceph::make_mutex("PG::_ref_id_lock");
   map<uint64_t, string> _live_ids;
   map<string, uint64_t> _tag_counts;
   uint64_t _ref_id = 0;
@@ -686,7 +686,8 @@ protected:
   void set_probe_targets(const set<pg_shard_t> &probe_set) override;
   void clear_probe_targets() override;
 
-  Mutex heartbeat_peer_lock;
+  ceph::mutex heartbeat_peer_lock =
+    ceph::make_mutex("PG::heartbeat_peer_lock");
   set<int> heartbeat_peers;
   set<int> probe_targets;
 
@@ -836,7 +837,7 @@ public:
   // The value of num_bytes could be negative,
   // but we don't let info.stats.stats.sum.num_bytes go negative.
   void add_num_bytes(int64_t num_bytes) {
-    ceph_assert(_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(_lock));
     if (num_bytes) {
       recovery_state.update_stats(
        [num_bytes](auto &history, auto &stats) {
@@ -849,7 +850,7 @@ public:
     }
   }
   void sub_num_bytes(int64_t num_bytes) {
-    ceph_assert(_lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(_lock));
     ceph_assert(num_bytes >= 0);
     if (num_bytes) {
       recovery_state.update_stats(
@@ -865,7 +866,7 @@ public:
 
   // Only used in testing so not worried about needing the PG lock here
   int64_t get_stats_num_bytes() {
-    Mutex::Locker l(_lock);
+    std::lock_guard l{_lock};
     int num_bytes = info.stats.stats.sum.num_bytes;
     if (pool.info.is_erasure()) {
       num_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
@@ -976,7 +977,8 @@ protected:
   object_stat_collection_t unstable_stats;
 
   // publish stats
-  Mutex pg_stats_publish_lock;
+  ceph::mutex pg_stats_publish_lock =
+    ceph::make_mutex("PG::pg_stats_publish_lock");
   bool pg_stats_publish_valid;
   pg_stat_t pg_stats_publish;
 
@@ -1060,7 +1062,8 @@ protected:
   friend class C_DeleteMore;
 
   // -- backoff --
-  Mutex backoff_lock;  // orders inside Backoff::lock
+  ceph::mutex backoff_lock = // orders inside Backoff::lock
+    ceph::make_mutex("PG::backoff_lock");
   map<hobject_t,set<BackoffRef>> backoffs;
 
   void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end);
index d85ce1d3a633afcebfe8da908b599b2e8c0e02ef..0d66d3c43ddb37619fa4c6c5deea234bf97b0751 100644 (file)
@@ -1525,7 +1525,6 @@ PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap,
     PGBackend::build_pg_backend(
       _pool.info, ec_profile, this, coll_t(p), ch, o->store, cct)),
   object_contexts(o->cct, o->cct->_conf->osd_pg_object_context_cache_count),
-  snapset_contexts_lock("PrimaryLogPG::snapset_contexts_lock"),
   new_backfill(false),
   temp_seq(0),
   snap_trimmer_machine(this)
index c50ebc722a54213d7018b7c767874a22fdbcf627..3498a419f13d1d3c3e0cc133b537e1068c9dcb1a 100644 (file)
@@ -1007,7 +1007,8 @@ protected:
   SharedLRU<hobject_t, ObjectContext> object_contexts;
   // map from oid.snapdir() to SnapSetContext *
   map<hobject_t, SnapSetContext*> snapset_contexts;
-  Mutex snapset_contexts_lock;
+  ceph::mutex snapset_contexts_lock =
+    ceph::make_mutex("PrimaryLogPG::snapset_contexts_lock");
 
   // debug order that client ops are applied
   map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
@@ -1053,7 +1054,7 @@ protected:
     _register_snapset_context(ssc);
   }
   void _register_snapset_context(SnapSetContext *ssc) {
-    ceph_assert(snapset_contexts_lock.is_locked());
+    ceph_assert(ceph_mutex_is_locked(snapset_contexts_lock));
     if (!ssc->registered) {
       ceph_assert(snapset_contexts.count(ssc->oid) == 0);
       ssc->registered = true;
index e391200d7469823c49a89a067b059fe06598fe1a..ec01e0018e28dc242ae31bd9b8497f777447f963 100644 (file)
@@ -16,7 +16,7 @@
 #define CEPH_OSD_SESSION_H
 
 #include "common/RefCountedObj.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "global/global_context.h"
 #include "include/spinlock.h"
 #include "OSDCap.h"
@@ -96,7 +96,7 @@ struct Backoff : public RefCountedObject {
     }
   }
 
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("Backoff::lock");
   // NOTE: the owning PG and session are either
   //   - *both* set, or
   //   - both null (teardown), or
@@ -111,7 +111,6 @@ struct Backoff : public RefCountedObject {
     : RefCountedObject(g_ceph_context, 0),
       pgid(pgid),
       id(i),
-      lock("Backoff::lock"),
       pg(pg),
       session(s),
       begin(b),
@@ -135,14 +134,15 @@ struct Session : public RefCountedObject {
   entity_addr_t socket_addr;
   WatchConState wstate;
 
-  Mutex session_dispatch_lock;
+  ceph::mutex session_dispatch_lock =
+    ceph::make_mutex("Session::session_dispatch_lock");
   boost::intrusive::list<OpRequest> waiting_on_map;
 
   ceph::spinlock sent_epoch_lock;
   epoch_t last_sent_epoch;
 
   /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
-  Mutex backoff_lock;
+  ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock");
   std::atomic<int> backoff_count= {0};  ///< simple count of backoffs
   map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs;
 
@@ -153,9 +153,7 @@ struct Session : public RefCountedObject {
     con(con_),
     socket_addr(con_->get_peer_socket_addr()),
     wstate(cct),
-    session_dispatch_lock("Session::session_dispatch_lock"),
-    last_sent_epoch(0),
-    backoff_lock("Session::backoff_lock")
+    last_sent_epoch(0)
     {}
 
   entity_addr_t& get_peer_socket_addr() {
@@ -210,7 +208,7 @@ struct Session : public RefCountedObject {
   // called by PG::release_*_backoffs and PG::clear_backoffs()
   void rm_backoff(BackoffRef b) {
     std::lock_guard l(backoff_lock);
-    ceph_assert(b->lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
     ceph_assert(b->session == this);
     auto i = backoffs.find(b->pgid);
     if (i != backoffs.end()) {
index bb25b4487c1903894c02f34f6e6698d840548855..4e13887e7ba3a45cdb3a9a28183b4f3d3c899b36 100644 (file)
@@ -47,8 +47,7 @@ Notify::Notify(
     notify_id(notify_id),
     version(version),
     osd(osd),
-    cb(NULL),
-    lock("Notify::lock") {}
+    cb(NULL) {}
 
 NotifyRef Notify::makeNotifyRef(
   ConnectionRef client,
@@ -75,27 +74,27 @@ class NotifyTimeoutCB : public CancelableContext {
 public:
   explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
   void finish(int) override {
-    notif->osd->watch_lock.Unlock();
-    notif->lock.Lock();
+    notif->osd->watch_lock.unlock();
+    notif->lock.lock();
     if (!canceled)
       notif->do_timeout(); // drops lock
     else
-      notif->lock.Unlock();
-    notif->osd->watch_lock.Lock();
+      notif->lock.unlock();
+    notif->osd->watch_lock.lock();
   }
   void cancel() override {
-    ceph_assert(notif->lock.is_locked_by_me());
+    ceph_assert(ceph_mutex_is_locked(notif->lock));
     canceled = true;
   }
 };
 
 void Notify::do_timeout()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked(lock));
   dout(10) << "timeout" << dendl;
   cb = nullptr;
   if (is_discarded()) {
-    lock.Unlock();
+    lock.unlock();
     return;
   }
 
@@ -104,7 +103,7 @@ void Notify::do_timeout()
   ceph_assert(complete);
   set<WatchRef> _watchers;
   _watchers.swap(watchers);
-  lock.Unlock();
+  lock.unlock();
 
   for (set<WatchRef>::iterator i = _watchers.begin();
        i != _watchers.end();
@@ -120,28 +119,26 @@ void Notify::do_timeout()
 
 void Notify::register_cb()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked(lock));
   {
-    osd->watch_lock.Lock();
+    std::lock_guard l{osd->watch_lock};
     cb = new NotifyTimeoutCB(self.lock());
     if (!osd->watch_timer.add_event_after(timeout, cb)) {
       cb = nullptr;
     }
-    osd->watch_lock.Unlock();
   }
 }
 
 void Notify::unregister_cb()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked(lock));
   if (!cb)
     return;
   cb->cancel();
   {
-    osd->watch_lock.Lock();
+    std::lock_guard l{osd->watch_lock};
     osd->watch_timer.cancel_event(cb);
     cb = nullptr;
-    osd->watch_lock.Unlock();
   }
 }
 
@@ -245,14 +242,14 @@ public:
     OSDService *osd(watch->osd);
     ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
     boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
-    osd->watch_lock.Unlock();
+    osd->watch_lock.unlock();
     pg->lock();
     watch->cb = nullptr;
     if (!watch->is_discarded() && !canceled)
       watch->pg->handle_watch_timeout(watch);
     delete this; // ~Watch requires pg lock!
     pg->unlock();
-    osd->watch_lock.Lock();
+    osd->watch_lock.lock();
   }
 };
 
index 823a470b33ccddd0f54b2e5934be1693c5250500..3d3b09fd772234a587c32f609ad509527131e2bf 100644 (file)
@@ -64,7 +64,7 @@ class Notify {
 
   OSDService *osd;
   CancelableContext *cb;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("Notify::lock");
 
   /// (gid,cookie) -> reply_bl for everyone who acked the notify
   std::multimap<std::pair<uint64_t,uint64_t>, ceph::buffer::list> notify_replies;
@@ -270,11 +270,11 @@ public:
  * Lives in the Session object of an OSD connection
  */
 class WatchConState {
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("WatchConState");
   std::set<WatchRef> watches;
 public:
   CephContext* cct;
-  explicit WatchConState(CephContext* cct) : lock("WatchConState"), cct(cct) {}
+  explicit WatchConState(CephContext* cct) : cct(cct) {}
 
   /// Add a watch
   void addWatch(