]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 03:15:03 +0000 (11:15 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 01:34:50 +0000 (09:34 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/mon/MonClient.cc
src/mon/MonClient.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/OSDMonitor.h
src/mon/Paxos.cc
src/mon/Paxos.h

index c3d136713166445df6f1c9b162cbf74cf4086d78..f955d5d766604f02d81b9bf0b70ce1bc9e47e953 100644 (file)
@@ -62,7 +62,6 @@ MonClient::MonClient(CephContext *cct_) :
   Dispatcher(cct_),
   AuthServer(cct_),
   messenger(NULL),
-  monc_lock("MonClient::monc_lock"),
   timer(cct_, monc_lock),
   finisher(cct_),
   initialized(false),
@@ -93,15 +92,12 @@ int MonClient::build_initial_monmap()
 int MonClient::get_monmap()
 {
   ldout(cct, 10) << __func__ << dendl;
-  std::lock_guard l(monc_lock);
+  std::unique_lock l(monc_lock);
   
   sub.want("monmap", 0, 0);
   if (!_opened())
     _reopen_session();
-
-  while (want_monmap)
-    map_cond.Wait(monc_lock);
-
+  map_cond.wait(l, [this] { return !want_monmap; });
   ldout(cct, 10) << __func__ << " done" << dendl;
   return 0;
 }
@@ -113,9 +109,6 @@ int MonClient::get_monmap_and_config()
 
   int tries = 10;
 
-  utime_t interval;
-  interval.set_from_double(cct->_conf->mon_client_hunt_interval);
-
   cct->init_crypto();
   auto shutdown_crypto = make_scope_guard([this] {
     cct->shutdown_crypto();
@@ -156,7 +149,7 @@ int MonClient::get_monmap_and_config()
       break;
     }
     {
-      std::lock_guard l(monc_lock);
+      std::unique_lock l(monc_lock);
       if (monmap.get_epoch() &&
          !monmap.persistent_features.contains_all(
            ceph::features::mon::FEATURE_MIMIC)) {
@@ -167,7 +160,8 @@ int MonClient::get_monmap_and_config()
       }
       while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
        ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
-       r = map_cond.WaitInterval(monc_lock, interval);
+       map_cond.wait_for(l, ceph::make_timespan(
+          cct->_conf->mon_client_hunt_interval));
       }
       if (got_config) {
        ldout(cct,10) << __func__ << " success" << dendl;
@@ -259,14 +253,12 @@ int MonClient::ping_monitor(const string &mon_id, string *result_reply)
   pinger->mc->start(monmap.get_epoch(), entity_name);
   con->send_message(new MPing);
 
-  pinger->lock.Lock();
   int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
   if (ret == 0) {
     ldout(cct,10) << __func__ << " got ping reply" << dendl;
   } else {
     ret = -ret;
   }
-  pinger->lock.Unlock();
 
   con->mark_down();
   pinger->mc.reset();
@@ -412,7 +404,7 @@ void MonClient::handle_monmap(MMonMap *m)
   }
 
   sub.got("monmap", monmap.get_epoch());
-  map_cond.Signal();
+  map_cond.notify_all();
   want_monmap = false;
 
   if (authenticate_err == 1) {
@@ -431,7 +423,7 @@ void MonClient::handle_config(MConfig *m)
        m->put();
       }));
   got_config = true;
-  map_cond.Signal();
+  map_cond.notify_all();
 }
 
 // ----------------------
@@ -478,7 +470,7 @@ int MonClient::init()
 void MonClient::shutdown()
 {
   ldout(cct, 10) << __func__ << dendl;
-  monc_lock.Lock();
+  monc_lock.lock();
   stopping = true;
   while (!version_requests.empty()) {
     version_requests.begin()->second->context->complete(-ECANCELED);
@@ -501,22 +493,22 @@ void MonClient::shutdown()
   pending_cons.clear();
   auth.reset();
 
-  monc_lock.Unlock();
+  monc_lock.unlock();
 
   if (initialized) {
     finisher.wait_for_empty();
     finisher.stop();
     initialized = false;
   }
-  monc_lock.Lock();
+  monc_lock.lock();
   timer.shutdown();
   stopping = false;
-  monc_lock.Unlock();
+  monc_lock.unlock();
 }
 
 int MonClient::authenticate(double timeout)
 {
-  std::lock_guard lock(monc_lock);
+  std::unique_lock lock{monc_lock};
 
   if (active_con) {
     ldout(cct, 5) << "already authenticated" << dendl;
@@ -527,20 +519,20 @@ int MonClient::authenticate(double timeout)
   if (!_opened())
     _reopen_session();
 
-  utime_t until = ceph_clock_now();
-  until += timeout;
+  auto until = ceph::real_clock::now();
+  until += ceph::make_timespan(timeout);
   if (timeout > 0.0)
     ldout(cct, 10) << "authenticate will time out at " << until << dendl;
   authenticate_err = 1;  // == in progress
   while (!active_con && authenticate_err >= 0) {
     if (timeout > 0.0) {
-      int r = auth_cond.WaitUntil(monc_lock, until);
-      if (r == ETIMEDOUT && !active_con) {
+      auto r = auth_cond.wait_until(lock, until);
+      if (r == cv_status::timeout && !active_con) {
        ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
-       authenticate_err = -r;
+       authenticate_err = -ETIMEDOUT;
       }
     } else {
-      auth_cond.Wait(monc_lock);
+      auth_cond.wait(lock);
     }
   }
 
@@ -561,7 +553,7 @@ int MonClient::authenticate(double timeout)
 
 void MonClient::handle_auth(MAuthReply *m)
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   if (!_hunting()) {
     std::swap(active_con->get_auth(), auth);
     int ret = active_con->authenticate(m);
@@ -614,7 +606,7 @@ void MonClient::_finish_auth(int auth_err)
     ceph_assert(auth);
     _check_auth_tickets();
   }
-  auth_cond.SignalAll();
+  auth_cond.notify_all();
 
   if (!auth_err) {
     Context *cb = nullptr;
@@ -622,9 +614,9 @@ void MonClient::_finish_auth(int auth_err)
       cb = session_established_context.release();
     }
     if (cb) {
-      monc_lock.Unlock();
+      monc_lock.unlock();
       cb->complete(0);
-      monc_lock.Lock();
+      monc_lock.lock();
     }
   }
 }
@@ -633,7 +625,7 @@ void MonClient::_finish_auth(int auth_err)
 
 void MonClient::_send_mon_message(Message *m)
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   if (active_con) {
     auto cur_con = active_con->get_con();
     ldout(cct, 10) << "_send_mon_message to mon."
@@ -647,7 +639,7 @@ void MonClient::_send_mon_message(Message *m)
 
 void MonClient::_reopen_session(int rank)
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   ldout(cct, 10) << __func__ << " rank " << rank << dendl;
 
   active_con.reset();
@@ -786,7 +778,7 @@ bool MonClient::ms_handle_reset(Connection *con)
 
 bool MonClient::_opened() const
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   return active_con || _hunting();
 }
 
@@ -812,7 +804,7 @@ void MonClient::_start_hunting()
 void MonClient::_finish_hunting(int auth_err)
 {
   ldout(cct,10) << __func__ << " " << auth_err << dendl;
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   // the pending conns have been cleaned.
   ceph_assert(!_hunting());
   if (active_con) {
@@ -917,7 +909,7 @@ void MonClient::schedule_tick()
 
 void MonClient::_renew_subs()
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   if (!sub.have_new()) {
     ldout(cct, 10) << __func__ << " - empty" << dendl;
     return;
@@ -943,7 +935,7 @@ void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
 
 int MonClient::_check_auth_tickets()
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   if (active_con && auth) {
     if (auth->need_tickets()) {
       ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
@@ -961,7 +953,7 @@ int MonClient::_check_auth_tickets()
 
 int MonClient::_check_auth_rotating()
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   if (!rotating_secrets ||
       !auth_principal_needs_rotating_keys(entity_name)) {
     ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
@@ -1010,10 +1002,7 @@ int MonClient::_check_auth_rotating()
 
 int MonClient::wait_auth_rotating(double timeout)
 {
-  std::lock_guard l(monc_lock);
-  utime_t now = ceph_clock_now();
-  utime_t until = now;
-  until += timeout;
+  std::unique_lock l(monc_lock);
 
   // Must be initialized
   ceph_assert(auth != nullptr);
@@ -1024,18 +1013,18 @@ int MonClient::wait_auth_rotating(double timeout)
   if (!rotating_secrets)
     return 0;
 
-  while (auth_principal_needs_rotating_keys(entity_name) &&
-        rotating_secrets->need_new_secrets(now)) {
-    if (now >= until) {
-      ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
-      return -ETIMEDOUT;
-    }
-    ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl;
-    auth_cond.WaitUntil(monc_lock, until);
-    now = ceph_clock_now();
+  ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
+  utime_t now = ceph_clock_now();
+  if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] {
+    return (!auth_principal_needs_rotating_keys(entity_name) ||
+           !rotating_secrets->need_new_secrets(now));
+  })) {
+    ldout(cct, 10) << __func__ << " done" << dendl;
+    return 0;
+  } else {
+    ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
+    return -ETIMEDOUT;
   }
-  ldout(cct, 10) << __func__ << " done" << dendl;
-  return 0;
 }
 
 // ---------
@@ -1121,7 +1110,7 @@ void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
 
 int MonClient::_cancel_mon_command(uint64_t tid)
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
 
   auto it = mon_commands.find(tid);
   if (it == mon_commands.end()) {
@@ -1243,7 +1232,7 @@ void MonClient::get_version(string map, version_t *newest, version_t *oldest, Co
 
 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
 {
-  ceph_assert(monc_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(monc_lock));
   auto iter = version_requests.find(m->handle);
   if (iter == version_requests.end()) {
     ldout(cct, 0) << __func__ << " version request with handle " << m->handle
index d8716fa27d529e1c226ea3005aa893839221c7cc..2fa4fdcf8643ad15f513c95a58342b563c51aa0e 100644 (file)
@@ -132,8 +132,8 @@ private:
 struct MonClientPinger : public Dispatcher,
                         public AuthClient {
 
-  Mutex lock;
-  Cond ping_recvd_cond;
+  ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
+  ceph::condition_variable ping_recvd_cond;
   std::string *result;
   bool done;
   RotatingKeyRing *keyring;
@@ -143,24 +143,24 @@ struct MonClientPinger : public Dispatcher,
                  RotatingKeyRing *keyring,
                  std::string *res_) :
     Dispatcher(cct_),
-    lock("MonClientPinger::lock"),
     result(res_),
     done(false),
     keyring(keyring)
   { }
 
   int wait_for_reply(double timeout = 0.0) {
-    utime_t until = ceph_clock_now();
-    until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
+    std::unique_lock locker{lock};
+    if (timeout <= 0) {
+      timeout = cct->_conf->client_mount_timeout;
+    }
     done = false;
-
-    int ret = 0;
-    while (!done) {
-      ret = ping_recvd_cond.WaitUntil(lock, until);
-      if (ret == ETIMEDOUT)
-        break;
+    if (ping_recvd_cond.wait_for(locker,
+                                ceph::make_timespan(timeout),
+                                [this] { return done; })) {
+      return 0;
+    } else {
+      return ETIMEDOUT;
     }
-    return ret;
   }
 
   bool ms_dispatch(Message *m) override {
@@ -175,14 +175,14 @@ struct MonClientPinger : public Dispatcher,
       decode(*result, p);
     }
     done = true;
-    ping_recvd_cond.SignalAll();
+    ping_recvd_cond.notify_all();
     m->put();
     return true;
   }
   bool ms_handle_reset(Connection *con) override {
     std::lock_guard l(lock);
     done = true;
-    ping_recvd_cond.SignalAll();
+    ping_recvd_cond.notify_all();
     return true;
   }
   void ms_handle_remote_reset(Connection *con) override {}
@@ -247,7 +247,7 @@ private:
 
   EntityName entity_name;
 
-  mutable Mutex monc_lock;
+  mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
   SafeTimer timer;
   Finisher finisher;
 
@@ -275,7 +275,7 @@ private:
 
   // monclient
   bool want_monmap;
-  Cond map_cond;
+  ceph::condition_variable map_cond;
   bool passthrough_monmap = false;
   bool got_config = false;
 
@@ -283,7 +283,7 @@ private:
   std::unique_ptr<AuthClientHandler> auth;
   uint32_t want_keys = 0;
   uint64_t global_id = 0;
-  Cond auth_cond;
+  ceph::condition_variable auth_cond;
   int authenticate_err = 0;
   bool authenticated = false;
 
index d33b0d89d4c02c0d2cc4bf19baf52c33e3f30490..6bf7d081724467c4715ad59c7903156c13c9ebd5 100644 (file)
@@ -136,7 +136,6 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   rank(-1), 
   messenger(m),
   con_self(m ? m->get_loopback_connection() : NULL),
-  lock("Monitor::lock"),
   timer(cct_, lock),
   finisher(cct_, "mon_finisher", "fin"),
   cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
@@ -551,7 +550,7 @@ void Monitor::handle_conf_change(const ConfigProxy& conf,
       changed.count("mon_health_to_clog_interval") ||
       changed.count("mon_health_to_clog_tick_interval")) {
     finisher.queue(new C_MonContext(this, [this, changed](int) {
-      Mutex::Locker l(lock);
+      std::lock_guard l{lock};
       health_to_clog_update_conf(changed);
     }));
   }
@@ -559,7 +558,7 @@ void Monitor::handle_conf_change(const ConfigProxy& conf,
   if (changed.count("mon_scrub_interval")) {
     int scrub_interval = conf->mon_scrub_interval;
     finisher.queue(new C_MonContext(this, [this, scrub_interval](int) {
-      Mutex::Locker l(lock);
+      std::lock_guard l{lock};
       scrub_update_interval(scrub_interval);
     }));
   }
@@ -624,14 +623,13 @@ int Monitor::sanitize_options()
 
 int Monitor::preinit()
 {
-  lock.Lock();
+  std::unique_lock l(lock);
 
   dout(1) << "preinit fsid " << monmap->fsid << dendl;
 
   int r = sanitize_options();
   if (r < 0) {
     derr << "option sanitization failed!" << dendl;
-    lock.Unlock();
     return r;
   }
 
@@ -691,7 +689,6 @@ int Monitor::preinit()
     if (r == -ENOENT)
       r = write_fsid();
     if (r < 0) {
-      lock.Unlock();
       return r;
     }
   }
@@ -726,7 +723,6 @@ int Monitor::preinit()
               << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
     } else {
       derr << "commit suicide!" << dendl;
-      lock.Unlock();
       return -ENOENT;
     }
   }
@@ -788,7 +784,6 @@ int Monitor::preinit()
        write_default_keyring(bl);
       } else {
        derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
-       lock.Unlock();
        return r;
       }
     }
@@ -798,7 +793,8 @@ int Monitor::preinit()
   AdminSocket* admin_socket = cct->get_admin_socket();
 
   // unlock while registering to avoid mon_lock -> admin socket lock dependency.
-  lock.Unlock();
+  l.unlock();
+
   r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
                                     "show current monitor status");
   ceph_assert(r == 0);
@@ -857,7 +853,7 @@ int Monitor::preinit()
                                     "show recent slow ops");
   ceph_assert(r == 0);
 
-  lock.Lock();
+  l.lock();
 
   // add ourselves as a conf observer
   g_conf().add_observer(this);
@@ -868,7 +864,6 @@ int Monitor::preinit()
 
   auth_registry.refresh_config();
 
-  lock.Unlock();
   return 0;
 }
 
@@ -973,7 +968,7 @@ void Monitor::shutdown()
 {
   dout(1) << "shutdown" << dendl;
 
-  lock.Lock();
+  lock.lock();
 
   wait_for_paxos_write();
 
@@ -984,9 +979,9 @@ void Monitor::shutdown()
 
   state = STATE_SHUTDOWN;
 
-  lock.Unlock();
+  lock.unlock();
   g_conf().remove_observer(this);
-  lock.Lock();
+  lock.lock();
 
   if (admin_hook) {
     cct->get_admin_socket()->unregister_commands(admin_hook);
@@ -998,10 +993,10 @@ void Monitor::shutdown()
 
   mgr_client.shutdown();
 
-  lock.Unlock();
+  lock.unlock();
   finisher.wait_for_empty();
   finisher.stop();
-  lock.Lock();
+  lock.lock();
 
   // clean up
   paxos->shutdown();
@@ -1021,7 +1016,7 @@ void Monitor::shutdown()
   log_client.shutdown();
 
   // unlock before msgr shutdown...
-  lock.Unlock();
+  lock.unlock();
 
   // shutdown messenger before removing logger from perfcounter collection, 
   // otherwise _ms_dispatch() will try to update deleted logger
@@ -1045,9 +1040,9 @@ void Monitor::wait_for_paxos_write()
 {
   if (paxos->is_writing() || paxos->is_writing_previous()) {
     dout(10) << __func__ << " flushing pending write" << dendl;
-    lock.Unlock();
+    lock.unlock();
     store->flush();
-    lock.Lock();
+    lock.lock();
     dout(10) << __func__ << " flushed pending write" << dendl;
   }
 }
@@ -2215,7 +2210,7 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
         dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
         healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
               [this](int r){
-                ceph_assert(lock.is_locked_by_me());
+                ceph_assert(ceph_mutex_is_locked_by_me(lock));
                 do_health_to_clog_interval();
               }));
 
@@ -2654,14 +2649,14 @@ void Monitor::health_tick_stop()
   }
 }
 
-utime_t Monitor::health_interval_calc_next_update()
+ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
 {
-  utime_t now = ceph_clock_now();
+  auto now = ceph::real_clock::now();
 
-  time_t secs = now.sec();
-  int remainder = secs % cct->_conf->mon_health_to_clog_interval;
+  auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
+  int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
   int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
-  utime_t next = utime_t(secs + adjustment, 0);
+  auto next = secs + std::chrono::seconds(adjustment);
 
   dout(20) << __func__
     << " now: " << now << ","
@@ -2669,7 +2664,7 @@ utime_t Monitor::health_interval_calc_next_update()
     << " interval: " << cct->_conf->mon_health_to_clog_interval
     << dendl;
 
-  return next;
+  return ceph::real_clock::time_point{next};
 }
 
 void Monitor::health_interval_start()
@@ -2682,7 +2677,7 @@ void Monitor::health_interval_start()
   }
 
   health_interval_stop();
-  utime_t next = health_interval_calc_next_update();
+  auto next = health_interval_calc_next_update();
   health_interval_event = new C_MonContext(this, [this](int r) {
       if (r < 0)
         return;
index 9af1410c54f91f7b118a9c7ca219e329f826fa3b..8509b41587c3b4a2c7a344b4538b471cb926bfc7 100644 (file)
@@ -125,7 +125,7 @@ public:
   int rank;
   Messenger *messenger;
   ConnectionRef con_self;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("Monitor::lock");
   SafeTimer timer;
   Finisher finisher;
   ThreadPool cpu_tp;  ///< threadpool for CPU intensive work
@@ -667,7 +667,7 @@ public:
 
   // -- sessions --
   MonSessionMap session_map;
-  Mutex session_map_lock{"Monitor::session_map_lock"};
+  ceph::mutex session_map_lock = ceph::make_mutex("Monitor::session_map_lock");
   AdminSocketHook *admin_hook;
 
   template<typename Func, typename...Args>
@@ -727,7 +727,7 @@ public:
 
   void health_tick_start();
   void health_tick_stop();
-  utime_t health_interval_calc_next_update();
+  ceph::real_clock::time_point health_interval_calc_next_update();
   void health_interval_start();
   void health_interval_stop();
   void health_events_cleanup();
@@ -884,9 +884,8 @@ public:
   //on forwarded messages, so we create a non-locking version for this class
   void _ms_dispatch(Message *m);
   bool ms_dispatch(Message *m) override {
-    lock.Lock();
+    std::lock_guard l{lock};
     _ms_dispatch(m);
-    lock.Unlock();
     return true;
   }
   void dispatch_op(MonOpRequestRef op);
index 426f3edb2c3da8af746649a483cb8beb789404f7..7e44bce1f712173e851f6a509b71b8c0c481e9a4 100644 (file)
@@ -256,7 +256,8 @@ public:
     OSDMap::Incremental& pending_inc;
     // lock to protect pending_inc form changing
     // when checking is done
-    Mutex pending_inc_lock = {"CleanUpmapJob::pending_inc_lock"};
+    ceph::mutex pending_inc_lock =
+      ceph::make_mutex("CleanUpmapJob::pending_inc_lock");
 
     CleanUpmapJob(CephContext *cct, const OSDMap& om, OSDMap::Incremental& pi)
       : ParallelPGMapper::Job(&om),
@@ -340,7 +341,8 @@ private:
   void check_osdmap_subs();
   void share_map_with_random_osd();
 
-  Mutex prime_pg_temp_lock = {"OSDMonitor::prime_pg_temp_lock"};
+  ceph::mutex prime_pg_temp_lock =
+    ceph::make_mutex("OSDMonitor::prime_pg_temp_lock");
   struct PrimeTempJob : public ParallelPGMapper::Job {
     OSDMonitor *osdmon;
     PrimeTempJob(const OSDMap& om, OSDMonitor *m)
index 3a691931c9f2a2bc33f56144072671efa21af2cc..6ab254ad3974ed62241c88e1cb0bcf9e279bdc00 100644 (file)
@@ -723,7 +723,7 @@ void Paxos::handle_begin(MonOpRequestRef op)
 
   // set state.
   state = STATE_UPDATING;
-  lease_expire = utime_t();  // cancel lease
+  lease_expire = {};  // cancel lease
 
   // yes.
   version_t v = last_committed+1;
@@ -834,7 +834,7 @@ void Paxos::abort_commit()
   ceph_assert(commits_started > 0);
   --commits_started;
   if (commits_started == 0)
-    shutdown_cond.Signal();
+    shutdown_cond.notify_all();
 }
 
 void Paxos::commit_start()
@@ -891,7 +891,7 @@ void Paxos::commit_finish()
   // cancel lease - it was for the old value.
   //  (this would only happen if message layer lost the 'begin', but
   //   leader still got a majority and committed with out us.)
-  lease_expire = utime_t();  // cancel lease
+  lease_expire = {};  // cancel lease
 
   last_committed++;
   last_commit_time = ceph_clock_now();
@@ -969,8 +969,8 @@ void Paxos::extend_lease()
   ceph_assert(mon->is_leader());
   //assert(is_active());
 
-  lease_expire = ceph_clock_now();
-  lease_expire += g_conf()->mon_lease;
+  lease_expire = ceph::real_clock::now();
+  lease_expire += ceph::make_timespan(g_conf()->mon_lease);
   acked_lease.clear();
   acked_lease.insert(mon->rank);
 
@@ -985,7 +985,7 @@ void Paxos::extend_lease()
     MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
                                     ceph_clock_now());
     lease->last_committed = last_committed;
-    lease->lease_timestamp = lease_expire;
+    lease->lease_timestamp = utime_t{lease_expire};
     lease->first_committed = first_committed;
     mon->send_mon_message(lease, *p);
   }
@@ -1003,9 +1003,10 @@ void Paxos::extend_lease()
   }
 
   // set renew event
-  utime_t at = lease_expire;
-  at -= g_conf()->mon_lease;
-  at += g_conf()->mon_lease_renew_interval_factor * g_conf()->mon_lease;
+  auto at = lease_expire;
+  at -= ceph::make_timespan(g_conf()->mon_lease);
+  at += ceph::make_timespan(g_conf()->mon_lease_renew_interval_factor *
+                           g_conf()->mon_lease);
   lease_renew_event = mon->timer.add_event_at(
     at, new C_MonContext(mon, [this](int r) {
        if (r == -ECANCELED)
@@ -1107,12 +1108,13 @@ void Paxos::handle_lease(MonOpRequestRef op)
   warn_on_future_time(lease->sent_timestamp, lease->get_source());
 
   // extend lease
-  if (lease_expire < lease->lease_timestamp) {
-    lease_expire = lease->lease_timestamp;
+  if (auto new_expire = lease->lease_timestamp.to_real_time();
+      lease_expire < new_expire) {
+    lease_expire = new_expire;
 
-    utime_t now = ceph_clock_now();
+    auto now = ceph::real_clock::now();
     if (lease_expire < now) {
-      utime_t diff = now - lease_expire;
+      auto diff = now - lease_expire;
       derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
     }
   }
@@ -1322,8 +1324,10 @@ void Paxos::shutdown()
   // Let store finish commits in progress
   // XXX: I assume I can't use finish_contexts() because the store
   // is going to trigger
-  while(commits_started > 0)
-    shutdown_cond.Wait(mon->lock);
+  unique_lock l{mon->lock, std::adopt_lock};
+  shutdown_cond.wait(l, [this] { return commits_started <= 0; });
+  // Monitor::shutdown() will unlock it
+  l.release();
 
   finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
   finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
@@ -1353,7 +1357,7 @@ void Paxos::leader_init()
   }
 
   state = STATE_RECOVERING;
-  lease_expire = utime_t();
+  lease_expire = {};
   dout(10) << "leader_init -- starting paxos recovery" << dendl;
   collect(0);
 }
@@ -1364,7 +1368,7 @@ void Paxos::peon_init()
   new_value.clear();
 
   state = STATE_RECOVERING;
-  lease_expire = utime_t();
+  lease_expire = {};
   dout(10) << "peon_init -- i am a peon" << dendl;
 
   // start a timer, in case the leader never manages to issue a lease
@@ -1388,9 +1392,9 @@ void Paxos::restart()
 
   if (is_writing() || is_writing_previous()) {
     dout(10) << __func__ << " flushing" << dendl;
-    mon->lock.Unlock();
+    mon->lock.unlock();
     mon->store->flush();
-    mon->lock.Lock();
+    mon->lock.lock();
     dout(10) << __func__ << " flushed" << dendl;
   }
   state = STATE_RECOVERING;
@@ -1508,7 +1512,7 @@ version_t Paxos::read_current(bufferlist &bl)
 bool Paxos::is_lease_valid()
 {
   return ((mon->get_quorum().size() == 1)
-      || (ceph_clock_now() < lease_expire));
+         || (ceph::real_clock::now() < lease_expire));
 }
 
 // -- WRITE --
index 590279ed9cef07780ce9d1ac38528fd0445ebc3e..4aed2fae77f058fe0d687d95ac236b236e22310f 100644 (file)
@@ -276,7 +276,7 @@ private:
    */
   int commits_started = 0;
 
-  Cond shutdown_cond;
+  ceph::condition_variable shutdown_cond;
 
 public:
   /**
@@ -402,7 +402,7 @@ private:
    * keep leases. Each lease will have an expiration date, which may or may
    * not be extended. 
    */
-  utime_t lease_expire;
+  ceph::real_clock::time_point lease_expire;
   /**
    * List of callbacks waiting for our state to change into STATE_ACTIVE.
    */