From 85157d5aae3d0ed33c7837193fd69e6c73e37883 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sun, 7 Jul 2019 11:15:03 +0800 Subject: [PATCH] mon: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- src/mon/MonClient.cc | 97 ++++++++++++++++++++------------------------ src/mon/MonClient.h | 34 ++++++++-------- src/mon/Monitor.cc | 49 ++++++++++------------ src/mon/Monitor.h | 9 ++-- src/mon/OSDMonitor.h | 6 ++- src/mon/Paxos.cc | 44 +++++++++++--------- src/mon/Paxos.h | 4 +- 7 files changed, 116 insertions(+), 127 deletions(-) diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index c3d1367131664..f955d5d766604 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -62,7 +62,6 @@ MonClient::MonClient(CephContext *cct_) : Dispatcher(cct_), AuthServer(cct_), messenger(NULL), - monc_lock("MonClient::monc_lock"), timer(cct_, monc_lock), finisher(cct_), initialized(false), @@ -93,15 +92,12 @@ int MonClient::build_initial_monmap() int MonClient::get_monmap() { ldout(cct, 10) << __func__ << dendl; - std::lock_guard l(monc_lock); + std::unique_lock l(monc_lock); sub.want("monmap", 0, 0); if (!_opened()) _reopen_session(); - - while (want_monmap) - map_cond.Wait(monc_lock); - + map_cond.wait(l, [this] { return !want_monmap; }); ldout(cct, 10) << __func__ << " done" << dendl; return 0; } @@ -113,9 +109,6 @@ int MonClient::get_monmap_and_config() int tries = 10; - utime_t interval; - interval.set_from_double(cct->_conf->mon_client_hunt_interval); - cct->init_crypto(); auto shutdown_crypto = make_scope_guard([this] { cct->shutdown_crypto(); @@ -156,7 +149,7 @@ int MonClient::get_monmap_and_config() break; } { - std::lock_guard l(monc_lock); + std::unique_lock l(monc_lock); if (monmap.get_epoch() && !monmap.persistent_features.contains_all( ceph::features::mon::FEATURE_MIMIC)) { @@ -167,7 +160,8 @@ int MonClient::get_monmap_and_config() } while ((!got_config || monmap.get_epoch() == 0) && r == 0) { ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl; - r = map_cond.WaitInterval(monc_lock, interval); + map_cond.wait_for(l, ceph::make_timespan( + cct->_conf->mon_client_hunt_interval)); } if (got_config) { ldout(cct,10) << __func__ << " success" << dendl; @@ -259,14 +253,12 @@ int MonClient::ping_monitor(const string &mon_id, string *result_reply) pinger->mc->start(monmap.get_epoch(), entity_name); con->send_message(new MPing); - pinger->lock.Lock(); int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout); if (ret == 0) { ldout(cct,10) << __func__ << " got ping reply" << dendl; } else { ret = -ret; } - pinger->lock.Unlock(); con->mark_down(); pinger->mc.reset(); @@ -412,7 +404,7 @@ void MonClient::handle_monmap(MMonMap *m) } sub.got("monmap", monmap.get_epoch()); - map_cond.Signal(); + map_cond.notify_all(); want_monmap = false; if (authenticate_err == 1) { @@ -431,7 +423,7 @@ void MonClient::handle_config(MConfig *m) m->put(); })); got_config = true; - map_cond.Signal(); + map_cond.notify_all(); } // ---------------------- @@ -478,7 +470,7 @@ int MonClient::init() void MonClient::shutdown() { ldout(cct, 10) << __func__ << dendl; - monc_lock.Lock(); + monc_lock.lock(); stopping = true; while (!version_requests.empty()) { version_requests.begin()->second->context->complete(-ECANCELED); @@ -501,22 +493,22 @@ void MonClient::shutdown() pending_cons.clear(); auth.reset(); - monc_lock.Unlock(); + monc_lock.unlock(); if (initialized) { finisher.wait_for_empty(); finisher.stop(); initialized = false; } - monc_lock.Lock(); + monc_lock.lock(); timer.shutdown(); stopping = false; - monc_lock.Unlock(); + monc_lock.unlock(); } int MonClient::authenticate(double timeout) { - std::lock_guard lock(monc_lock); + std::unique_lock lock{monc_lock}; if (active_con) { ldout(cct, 5) << "already authenticated" << dendl; @@ -527,20 +519,20 @@ int MonClient::authenticate(double timeout) if (!_opened()) _reopen_session(); - utime_t until = ceph_clock_now(); - until += timeout; + auto until = ceph::real_clock::now(); + until += ceph::make_timespan(timeout); if (timeout > 0.0) ldout(cct, 10) << "authenticate will time out at " << until << dendl; authenticate_err = 1; // == in progress while (!active_con && authenticate_err >= 0) { if (timeout > 0.0) { - int r = auth_cond.WaitUntil(monc_lock, until); - if (r == ETIMEDOUT && !active_con) { + auto r = auth_cond.wait_until(lock, until); + if (r == cv_status::timeout && !active_con) { ldout(cct, 0) << "authenticate timed out after " << timeout << dendl; - authenticate_err = -r; + authenticate_err = -ETIMEDOUT; } } else { - auth_cond.Wait(monc_lock); + auth_cond.wait(lock); } } @@ -561,7 +553,7 @@ int MonClient::authenticate(double timeout) void MonClient::handle_auth(MAuthReply *m) { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); if (!_hunting()) { std::swap(active_con->get_auth(), auth); int ret = active_con->authenticate(m); @@ -614,7 +606,7 @@ void MonClient::_finish_auth(int auth_err) ceph_assert(auth); _check_auth_tickets(); } - auth_cond.SignalAll(); + auth_cond.notify_all(); if (!auth_err) { Context *cb = nullptr; @@ -622,9 +614,9 @@ void MonClient::_finish_auth(int auth_err) cb = session_established_context.release(); } if (cb) { - monc_lock.Unlock(); + monc_lock.unlock(); cb->complete(0); - monc_lock.Lock(); + monc_lock.lock(); } } } @@ -633,7 +625,7 @@ void MonClient::_finish_auth(int auth_err) void MonClient::_send_mon_message(Message *m) { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); if (active_con) { auto cur_con = active_con->get_con(); ldout(cct, 10) << "_send_mon_message to mon." @@ -647,7 +639,7 @@ void MonClient::_send_mon_message(Message *m) void MonClient::_reopen_session(int rank) { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); ldout(cct, 10) << __func__ << " rank " << rank << dendl; active_con.reset(); @@ -786,7 +778,7 @@ bool MonClient::ms_handle_reset(Connection *con) bool MonClient::_opened() const { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); return active_con || _hunting(); } @@ -812,7 +804,7 @@ void MonClient::_start_hunting() void MonClient::_finish_hunting(int auth_err) { ldout(cct,10) << __func__ << " " << auth_err << dendl; - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); // the pending conns have been cleaned. ceph_assert(!_hunting()); if (active_con) { @@ -917,7 +909,7 @@ void MonClient::schedule_tick() void MonClient::_renew_subs() { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); if (!sub.have_new()) { ldout(cct, 10) << __func__ << " - empty" << dendl; return; @@ -943,7 +935,7 @@ void MonClient::handle_subscribe_ack(MMonSubscribeAck *m) int MonClient::_check_auth_tickets() { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); if (active_con && auth) { if (auth->need_tickets()) { ldout(cct, 10) << __func__ << " getting new tickets!" << dendl; @@ -961,7 +953,7 @@ int MonClient::_check_auth_tickets() int MonClient::_check_auth_rotating() { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); if (!rotating_secrets || !auth_principal_needs_rotating_keys(entity_name)) { ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl; @@ -1010,10 +1002,7 @@ int MonClient::_check_auth_rotating() int MonClient::wait_auth_rotating(double timeout) { - std::lock_guard l(monc_lock); - utime_t now = ceph_clock_now(); - utime_t until = now; - until += timeout; + std::unique_lock l(monc_lock); // Must be initialized ceph_assert(auth != nullptr); @@ -1024,18 +1013,18 @@ int MonClient::wait_auth_rotating(double timeout) if (!rotating_secrets) return 0; - while (auth_principal_needs_rotating_keys(entity_name) && - rotating_secrets->need_new_secrets(now)) { - if (now >= until) { - ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl; - return -ETIMEDOUT; - } - ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl; - auth_cond.WaitUntil(monc_lock, until); - now = ceph_clock_now(); + ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl; + utime_t now = ceph_clock_now(); + if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] { + return (!auth_principal_needs_rotating_keys(entity_name) || + !rotating_secrets->need_new_secrets(now)); + })) { + ldout(cct, 10) << __func__ << " done" << dendl; + return 0; + } else { + ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl; + return -ETIMEDOUT; } - ldout(cct, 10) << __func__ << " done" << dendl; - return 0; } // --------- @@ -1121,7 +1110,7 @@ void MonClient::handle_mon_command_ack(MMonCommandAck *ack) int MonClient::_cancel_mon_command(uint64_t tid) { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); auto it = mon_commands.find(tid); if (it == mon_commands.end()) { @@ -1243,7 +1232,7 @@ void MonClient::get_version(string map, version_t *newest, version_t *oldest, Co void MonClient::handle_get_version_reply(MMonGetVersionReply* m) { - ceph_assert(monc_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(monc_lock)); auto iter = version_requests.find(m->handle); if (iter == version_requests.end()) { ldout(cct, 0) << __func__ << " version request with handle " << m->handle diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index d8716fa27d529..2fa4fdcf8643a 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -132,8 +132,8 @@ private: struct MonClientPinger : public Dispatcher, public AuthClient { - Mutex lock; - Cond ping_recvd_cond; + ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock"); + ceph::condition_variable ping_recvd_cond; std::string *result; bool done; RotatingKeyRing *keyring; @@ -143,24 +143,24 @@ struct MonClientPinger : public Dispatcher, RotatingKeyRing *keyring, std::string *res_) : Dispatcher(cct_), - lock("MonClientPinger::lock"), result(res_), done(false), keyring(keyring) { } int wait_for_reply(double timeout = 0.0) { - utime_t until = ceph_clock_now(); - until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout); + std::unique_lock locker{lock}; + if (timeout <= 0) { + timeout = cct->_conf->client_mount_timeout; + } done = false; - - int ret = 0; - while (!done) { - ret = ping_recvd_cond.WaitUntil(lock, until); - if (ret == ETIMEDOUT) - break; + if (ping_recvd_cond.wait_for(locker, + ceph::make_timespan(timeout), + [this] { return done; })) { + return 0; + } else { + return ETIMEDOUT; } - return ret; } bool ms_dispatch(Message *m) override { @@ -175,14 +175,14 @@ struct MonClientPinger : public Dispatcher, decode(*result, p); } done = true; - ping_recvd_cond.SignalAll(); + ping_recvd_cond.notify_all(); m->put(); return true; } bool ms_handle_reset(Connection *con) override { std::lock_guard l(lock); done = true; - ping_recvd_cond.SignalAll(); + ping_recvd_cond.notify_all(); return true; } void ms_handle_remote_reset(Connection *con) override {} @@ -247,7 +247,7 @@ private: EntityName entity_name; - mutable Mutex monc_lock; + mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock"); SafeTimer timer; Finisher finisher; @@ -275,7 +275,7 @@ private: // monclient bool want_monmap; - Cond map_cond; + ceph::condition_variable map_cond; bool passthrough_monmap = false; bool got_config = false; @@ -283,7 +283,7 @@ private: std::unique_ptr auth; uint32_t want_keys = 0; uint64_t global_id = 0; - Cond auth_cond; + ceph::condition_variable auth_cond; int authenticate_err = 0; bool authenticated = false; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index d33b0d89d4c02..6bf7d08172446 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -136,7 +136,6 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, rank(-1), messenger(m), con_self(m ? m->get_loopback_connection() : NULL), - lock("Monitor::lock"), timer(cct_, lock), finisher(cct_, "mon_finisher", "fin"), cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads), @@ -551,7 +550,7 @@ void Monitor::handle_conf_change(const ConfigProxy& conf, changed.count("mon_health_to_clog_interval") || changed.count("mon_health_to_clog_tick_interval")) { finisher.queue(new C_MonContext(this, [this, changed](int) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; health_to_clog_update_conf(changed); })); } @@ -559,7 +558,7 @@ void Monitor::handle_conf_change(const ConfigProxy& conf, if (changed.count("mon_scrub_interval")) { int scrub_interval = conf->mon_scrub_interval; finisher.queue(new C_MonContext(this, [this, scrub_interval](int) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; scrub_update_interval(scrub_interval); })); } @@ -624,14 +623,13 @@ int Monitor::sanitize_options() int Monitor::preinit() { - lock.Lock(); + std::unique_lock l(lock); dout(1) << "preinit fsid " << monmap->fsid << dendl; int r = sanitize_options(); if (r < 0) { derr << "option sanitization failed!" << dendl; - lock.Unlock(); return r; } @@ -691,7 +689,6 @@ int Monitor::preinit() if (r == -ENOENT) r = write_fsid(); if (r < 0) { - lock.Unlock(); return r; } } @@ -726,7 +723,6 @@ int Monitor::preinit() << "'mon_force_quorum_join' is set -- allowing boot" << dendl; } else { derr << "commit suicide!" << dendl; - lock.Unlock(); return -ENOENT; } } @@ -788,7 +784,6 @@ int Monitor::preinit() write_default_keyring(bl); } else { derr << "unable to load initial keyring " << g_conf()->keyring << dendl; - lock.Unlock(); return r; } } @@ -798,7 +793,8 @@ int Monitor::preinit() AdminSocket* admin_socket = cct->get_admin_socket(); // unlock while registering to avoid mon_lock -> admin socket lock dependency. - lock.Unlock(); + l.unlock(); + r = admin_socket->register_command("mon_status", "mon_status", admin_hook, "show current monitor status"); ceph_assert(r == 0); @@ -857,7 +853,7 @@ int Monitor::preinit() "show recent slow ops"); ceph_assert(r == 0); - lock.Lock(); + l.lock(); // add ourselves as a conf observer g_conf().add_observer(this); @@ -868,7 +864,6 @@ int Monitor::preinit() auth_registry.refresh_config(); - lock.Unlock(); return 0; } @@ -973,7 +968,7 @@ void Monitor::shutdown() { dout(1) << "shutdown" << dendl; - lock.Lock(); + lock.lock(); wait_for_paxos_write(); @@ -984,9 +979,9 @@ void Monitor::shutdown() state = STATE_SHUTDOWN; - lock.Unlock(); + lock.unlock(); g_conf().remove_observer(this); - lock.Lock(); + lock.lock(); if (admin_hook) { cct->get_admin_socket()->unregister_commands(admin_hook); @@ -998,10 +993,10 @@ void Monitor::shutdown() mgr_client.shutdown(); - lock.Unlock(); + lock.unlock(); finisher.wait_for_empty(); finisher.stop(); - lock.Lock(); + lock.lock(); // clean up paxos->shutdown(); @@ -1021,7 +1016,7 @@ void Monitor::shutdown() log_client.shutdown(); // unlock before msgr shutdown... - lock.Unlock(); + lock.unlock(); // shutdown messenger before removing logger from perfcounter collection, // otherwise _ms_dispatch() will try to update deleted logger @@ -1045,9 +1040,9 @@ void Monitor::wait_for_paxos_write() { if (paxos->is_writing() || paxos->is_writing_previous()) { dout(10) << __func__ << " flushing pending write" << dendl; - lock.Unlock(); + lock.unlock(); store->flush(); - lock.Lock(); + lock.lock(); dout(10) << __func__ << " flushed pending write" << dendl; } } @@ -2215,7 +2210,7 @@ void Monitor::win_election(epoch_t epoch, set& active, uint64_t features, dout(20) << __func__ << " healthmon proposing, waiting" << dendl; healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this, [this](int r){ - ceph_assert(lock.is_locked_by_me()); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); do_health_to_clog_interval(); })); @@ -2654,14 +2649,14 @@ void Monitor::health_tick_stop() } } -utime_t Monitor::health_interval_calc_next_update() +ceph::real_clock::time_point Monitor::health_interval_calc_next_update() { - utime_t now = ceph_clock_now(); + auto now = ceph::real_clock::now(); - time_t secs = now.sec(); - int remainder = secs % cct->_conf->mon_health_to_clog_interval; + auto secs = std::chrono::duration_cast(now.time_since_epoch()); + int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval; int adjustment = cct->_conf->mon_health_to_clog_interval - remainder; - utime_t next = utime_t(secs + adjustment, 0); + auto next = secs + std::chrono::seconds(adjustment); dout(20) << __func__ << " now: " << now << "," @@ -2669,7 +2664,7 @@ utime_t Monitor::health_interval_calc_next_update() << " interval: " << cct->_conf->mon_health_to_clog_interval << dendl; - return next; + return ceph::real_clock::time_point{next}; } void Monitor::health_interval_start() @@ -2682,7 +2677,7 @@ void Monitor::health_interval_start() } health_interval_stop(); - utime_t next = health_interval_calc_next_update(); + auto next = health_interval_calc_next_update(); health_interval_event = new C_MonContext(this, [this](int r) { if (r < 0) return; diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 9af1410c54f91..8509b41587c3b 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -125,7 +125,7 @@ public: int rank; Messenger *messenger; ConnectionRef con_self; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("Monitor::lock"); SafeTimer timer; Finisher finisher; ThreadPool cpu_tp; ///< threadpool for CPU intensive work @@ -667,7 +667,7 @@ public: // -- sessions -- MonSessionMap session_map; - Mutex session_map_lock{"Monitor::session_map_lock"}; + ceph::mutex session_map_lock = ceph::make_mutex("Monitor::session_map_lock"); AdminSocketHook *admin_hook; template @@ -727,7 +727,7 @@ public: void health_tick_start(); void health_tick_stop(); - utime_t health_interval_calc_next_update(); + ceph::real_clock::time_point health_interval_calc_next_update(); void health_interval_start(); void health_interval_stop(); void health_events_cleanup(); @@ -884,9 +884,8 @@ public: //on forwarded messages, so we create a non-locking version for this class void _ms_dispatch(Message *m); bool ms_dispatch(Message *m) override { - lock.Lock(); + std::lock_guard l{lock}; _ms_dispatch(m); - lock.Unlock(); return true; } void dispatch_op(MonOpRequestRef op); diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 426f3edb2c3da..7e44bce1f7121 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -256,7 +256,8 @@ public: OSDMap::Incremental& pending_inc; // lock to protect pending_inc form changing // when checking is done - Mutex pending_inc_lock = {"CleanUpmapJob::pending_inc_lock"}; + ceph::mutex pending_inc_lock = + ceph::make_mutex("CleanUpmapJob::pending_inc_lock"); CleanUpmapJob(CephContext *cct, const OSDMap& om, OSDMap::Incremental& pi) : ParallelPGMapper::Job(&om), @@ -340,7 +341,8 @@ private: void check_osdmap_subs(); void share_map_with_random_osd(); - Mutex prime_pg_temp_lock = {"OSDMonitor::prime_pg_temp_lock"}; + ceph::mutex prime_pg_temp_lock = + ceph::make_mutex("OSDMonitor::prime_pg_temp_lock"); struct PrimeTempJob : public ParallelPGMapper::Job { OSDMonitor *osdmon; PrimeTempJob(const OSDMap& om, OSDMonitor *m) diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 3a691931c9f2a..6ab254ad3974e 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -723,7 +723,7 @@ void Paxos::handle_begin(MonOpRequestRef op) // set state. state = STATE_UPDATING; - lease_expire = utime_t(); // cancel lease + lease_expire = {}; // cancel lease // yes. version_t v = last_committed+1; @@ -834,7 +834,7 @@ void Paxos::abort_commit() ceph_assert(commits_started > 0); --commits_started; if (commits_started == 0) - shutdown_cond.Signal(); + shutdown_cond.notify_all(); } void Paxos::commit_start() @@ -891,7 +891,7 @@ void Paxos::commit_finish() // cancel lease - it was for the old value. // (this would only happen if message layer lost the 'begin', but // leader still got a majority and committed with out us.) - lease_expire = utime_t(); // cancel lease + lease_expire = {}; // cancel lease last_committed++; last_commit_time = ceph_clock_now(); @@ -969,8 +969,8 @@ void Paxos::extend_lease() ceph_assert(mon->is_leader()); //assert(is_active()); - lease_expire = ceph_clock_now(); - lease_expire += g_conf()->mon_lease; + lease_expire = ceph::real_clock::now(); + lease_expire += ceph::make_timespan(g_conf()->mon_lease); acked_lease.clear(); acked_lease.insert(mon->rank); @@ -985,7 +985,7 @@ void Paxos::extend_lease() MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, ceph_clock_now()); lease->last_committed = last_committed; - lease->lease_timestamp = lease_expire; + lease->lease_timestamp = utime_t{lease_expire}; lease->first_committed = first_committed; mon->send_mon_message(lease, *p); } @@ -1003,9 +1003,10 @@ void Paxos::extend_lease() } // set renew event - utime_t at = lease_expire; - at -= g_conf()->mon_lease; - at += g_conf()->mon_lease_renew_interval_factor * g_conf()->mon_lease; + auto at = lease_expire; + at -= ceph::make_timespan(g_conf()->mon_lease); + at += ceph::make_timespan(g_conf()->mon_lease_renew_interval_factor * + g_conf()->mon_lease); lease_renew_event = mon->timer.add_event_at( at, new C_MonContext(mon, [this](int r) { if (r == -ECANCELED) @@ -1107,12 +1108,13 @@ void Paxos::handle_lease(MonOpRequestRef op) warn_on_future_time(lease->sent_timestamp, lease->get_source()); // extend lease - if (lease_expire < lease->lease_timestamp) { - lease_expire = lease->lease_timestamp; + if (auto new_expire = lease->lease_timestamp.to_real_time(); + lease_expire < new_expire) { + lease_expire = new_expire; - utime_t now = ceph_clock_now(); + auto now = ceph::real_clock::now(); if (lease_expire < now) { - utime_t diff = now - lease_expire; + auto diff = now - lease_expire; derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl; } } @@ -1322,8 +1324,10 @@ void Paxos::shutdown() // Let store finish commits in progress // XXX: I assume I can't use finish_contexts() because the store // is going to trigger - while(commits_started > 0) - shutdown_cond.Wait(mon->lock); + unique_lock l{mon->lock, std::adopt_lock}; + shutdown_cond.wait(l, [this] { return commits_started <= 0; }); + // Monitor::shutdown() will unlock it + l.release(); finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED); finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED); @@ -1353,7 +1357,7 @@ void Paxos::leader_init() } state = STATE_RECOVERING; - lease_expire = utime_t(); + lease_expire = {}; dout(10) << "leader_init -- starting paxos recovery" << dendl; collect(0); } @@ -1364,7 +1368,7 @@ void Paxos::peon_init() new_value.clear(); state = STATE_RECOVERING; - lease_expire = utime_t(); + lease_expire = {}; dout(10) << "peon_init -- i am a peon" << dendl; // start a timer, in case the leader never manages to issue a lease @@ -1388,9 +1392,9 @@ void Paxos::restart() if (is_writing() || is_writing_previous()) { dout(10) << __func__ << " flushing" << dendl; - mon->lock.Unlock(); + mon->lock.unlock(); mon->store->flush(); - mon->lock.Lock(); + mon->lock.lock(); dout(10) << __func__ << " flushed" << dendl; } state = STATE_RECOVERING; @@ -1508,7 +1512,7 @@ version_t Paxos::read_current(bufferlist &bl) bool Paxos::is_lease_valid() { return ((mon->get_quorum().size() == 1) - || (ceph_clock_now() < lease_expire)); + || (ceph::real_clock::now() < lease_expire)); } // -- WRITE -- diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index 590279ed9cef0..4aed2fae77f05 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -276,7 +276,7 @@ private: */ int commits_started = 0; - Cond shutdown_cond; + ceph::condition_variable shutdown_cond; public: /** @@ -402,7 +402,7 @@ private: * keep leases. Each lease will have an expiration date, which may or may * not be extended. */ - utime_t lease_expire; + ceph::real_clock::time_point lease_expire; /** * List of callbacks waiting for our state to change into STATE_ACTIVE. */ -- 2.39.5