Dispatcher(cct_),
AuthServer(cct_),
messenger(NULL),
- monc_lock("MonClient::monc_lock"),
timer(cct_, monc_lock),
finisher(cct_),
initialized(false),
int MonClient::get_monmap()
{
ldout(cct, 10) << __func__ << dendl;
- std::lock_guard l(monc_lock);
+ std::unique_lock l(monc_lock);
sub.want("monmap", 0, 0);
if (!_opened())
_reopen_session();
-
- while (want_monmap)
- map_cond.Wait(monc_lock);
-
+ map_cond.wait(l, [this] { return !want_monmap; });
ldout(cct, 10) << __func__ << " done" << dendl;
return 0;
}
int tries = 10;
- utime_t interval;
- interval.set_from_double(cct->_conf->mon_client_hunt_interval);
-
cct->init_crypto();
auto shutdown_crypto = make_scope_guard([this] {
cct->shutdown_crypto();
break;
}
{
- std::lock_guard l(monc_lock);
+ std::unique_lock l(monc_lock);
if (monmap.get_epoch() &&
!monmap.persistent_features.contains_all(
ceph::features::mon::FEATURE_MIMIC)) {
}
while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
- r = map_cond.WaitInterval(monc_lock, interval);
+ map_cond.wait_for(l, ceph::make_timespan(
+ cct->_conf->mon_client_hunt_interval));
}
if (got_config) {
ldout(cct,10) << __func__ << " success" << dendl;
pinger->mc->start(monmap.get_epoch(), entity_name);
con->send_message(new MPing);
- pinger->lock.Lock();
int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
if (ret == 0) {
ldout(cct,10) << __func__ << " got ping reply" << dendl;
} else {
ret = -ret;
}
- pinger->lock.Unlock();
con->mark_down();
pinger->mc.reset();
}
sub.got("monmap", monmap.get_epoch());
- map_cond.Signal();
+ map_cond.notify_all();
want_monmap = false;
if (authenticate_err == 1) {
m->put();
}));
got_config = true;
- map_cond.Signal();
+ map_cond.notify_all();
}
// ----------------------
void MonClient::shutdown()
{
ldout(cct, 10) << __func__ << dendl;
- monc_lock.Lock();
+ monc_lock.lock();
stopping = true;
while (!version_requests.empty()) {
version_requests.begin()->second->context->complete(-ECANCELED);
pending_cons.clear();
auth.reset();
- monc_lock.Unlock();
+ monc_lock.unlock();
if (initialized) {
finisher.wait_for_empty();
finisher.stop();
initialized = false;
}
- monc_lock.Lock();
+ monc_lock.lock();
timer.shutdown();
stopping = false;
- monc_lock.Unlock();
+ monc_lock.unlock();
}
int MonClient::authenticate(double timeout)
{
- std::lock_guard lock(monc_lock);
+ std::unique_lock lock{monc_lock};
if (active_con) {
ldout(cct, 5) << "already authenticated" << dendl;
if (!_opened())
_reopen_session();
- utime_t until = ceph_clock_now();
- until += timeout;
+ auto until = ceph::real_clock::now();
+ until += ceph::make_timespan(timeout);
if (timeout > 0.0)
ldout(cct, 10) << "authenticate will time out at " << until << dendl;
authenticate_err = 1; // == in progress
while (!active_con && authenticate_err >= 0) {
if (timeout > 0.0) {
- int r = auth_cond.WaitUntil(monc_lock, until);
- if (r == ETIMEDOUT && !active_con) {
+ auto r = auth_cond.wait_until(lock, until);
+ if (r == cv_status::timeout && !active_con) {
ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
- authenticate_err = -r;
+ authenticate_err = -ETIMEDOUT;
}
} else {
- auth_cond.Wait(monc_lock);
+ auth_cond.wait(lock);
}
}
void MonClient::handle_auth(MAuthReply *m)
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
if (!_hunting()) {
std::swap(active_con->get_auth(), auth);
int ret = active_con->authenticate(m);
ceph_assert(auth);
_check_auth_tickets();
}
- auth_cond.SignalAll();
+ auth_cond.notify_all();
if (!auth_err) {
Context *cb = nullptr;
cb = session_established_context.release();
}
if (cb) {
- monc_lock.Unlock();
+ monc_lock.unlock();
cb->complete(0);
- monc_lock.Lock();
+ monc_lock.lock();
}
}
}
void MonClient::_send_mon_message(Message *m)
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
if (active_con) {
auto cur_con = active_con->get_con();
ldout(cct, 10) << "_send_mon_message to mon."
void MonClient::_reopen_session(int rank)
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
ldout(cct, 10) << __func__ << " rank " << rank << dendl;
active_con.reset();
bool MonClient::_opened() const
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
return active_con || _hunting();
}
void MonClient::_finish_hunting(int auth_err)
{
ldout(cct,10) << __func__ << " " << auth_err << dendl;
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
// the pending conns have been cleaned.
ceph_assert(!_hunting());
if (active_con) {
void MonClient::_renew_subs()
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
if (!sub.have_new()) {
ldout(cct, 10) << __func__ << " - empty" << dendl;
return;
int MonClient::_check_auth_tickets()
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
if (active_con && auth) {
if (auth->need_tickets()) {
ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
int MonClient::_check_auth_rotating()
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
if (!rotating_secrets ||
!auth_principal_needs_rotating_keys(entity_name)) {
ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
int MonClient::wait_auth_rotating(double timeout)
{
- std::lock_guard l(monc_lock);
- utime_t now = ceph_clock_now();
- utime_t until = now;
- until += timeout;
+ std::unique_lock l(monc_lock);
// Must be initialized
ceph_assert(auth != nullptr);
if (!rotating_secrets)
return 0;
- while (auth_principal_needs_rotating_keys(entity_name) &&
- rotating_secrets->need_new_secrets(now)) {
- if (now >= until) {
- ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
- return -ETIMEDOUT;
- }
- ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl;
- auth_cond.WaitUntil(monc_lock, until);
- now = ceph_clock_now();
+ ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
+ utime_t now = ceph_clock_now();
+ if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] {
+ return (!auth_principal_needs_rotating_keys(entity_name) ||
+ !rotating_secrets->need_new_secrets(now));
+ })) {
+ ldout(cct, 10) << __func__ << " done" << dendl;
+ return 0;
+ } else {
+ ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
+ return -ETIMEDOUT;
}
- ldout(cct, 10) << __func__ << " done" << dendl;
- return 0;
}
// ---------
int MonClient::_cancel_mon_command(uint64_t tid)
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
auto it = mon_commands.find(tid);
if (it == mon_commands.end()) {
void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
{
- ceph_assert(monc_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
auto iter = version_requests.find(m->handle);
if (iter == version_requests.end()) {
ldout(cct, 0) << __func__ << " version request with handle " << m->handle
struct MonClientPinger : public Dispatcher,
public AuthClient {
- Mutex lock;
- Cond ping_recvd_cond;
+ ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
+ ceph::condition_variable ping_recvd_cond;
std::string *result;
bool done;
RotatingKeyRing *keyring;
RotatingKeyRing *keyring,
std::string *res_) :
Dispatcher(cct_),
- lock("MonClientPinger::lock"),
result(res_),
done(false),
keyring(keyring)
{ }
int wait_for_reply(double timeout = 0.0) {
- utime_t until = ceph_clock_now();
- until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
+ std::unique_lock locker{lock};
+ if (timeout <= 0) {
+ timeout = cct->_conf->client_mount_timeout;
+ }
done = false;
-
- int ret = 0;
- while (!done) {
- ret = ping_recvd_cond.WaitUntil(lock, until);
- if (ret == ETIMEDOUT)
- break;
+ if (ping_recvd_cond.wait_for(locker,
+ ceph::make_timespan(timeout),
+ [this] { return done; })) {
+ return 0;
+ } else {
+ return ETIMEDOUT;
}
- return ret;
}
bool ms_dispatch(Message *m) override {
decode(*result, p);
}
done = true;
- ping_recvd_cond.SignalAll();
+ ping_recvd_cond.notify_all();
m->put();
return true;
}
bool ms_handle_reset(Connection *con) override {
std::lock_guard l(lock);
done = true;
- ping_recvd_cond.SignalAll();
+ ping_recvd_cond.notify_all();
return true;
}
void ms_handle_remote_reset(Connection *con) override {}
EntityName entity_name;
- mutable Mutex monc_lock;
+ mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
SafeTimer timer;
Finisher finisher;
// monclient
bool want_monmap;
- Cond map_cond;
+ ceph::condition_variable map_cond;
bool passthrough_monmap = false;
bool got_config = false;
std::unique_ptr<AuthClientHandler> auth;
uint32_t want_keys = 0;
uint64_t global_id = 0;
- Cond auth_cond;
+ ceph::condition_variable auth_cond;
int authenticate_err = 0;
bool authenticated = false;
rank(-1),
messenger(m),
con_self(m ? m->get_loopback_connection() : NULL),
- lock("Monitor::lock"),
timer(cct_, lock),
finisher(cct_, "mon_finisher", "fin"),
cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
changed.count("mon_health_to_clog_interval") ||
changed.count("mon_health_to_clog_tick_interval")) {
finisher.queue(new C_MonContext(this, [this, changed](int) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
health_to_clog_update_conf(changed);
}));
}
if (changed.count("mon_scrub_interval")) {
int scrub_interval = conf->mon_scrub_interval;
finisher.queue(new C_MonContext(this, [this, scrub_interval](int) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
scrub_update_interval(scrub_interval);
}));
}
int Monitor::preinit()
{
- lock.Lock();
+ std::unique_lock l(lock);
dout(1) << "preinit fsid " << monmap->fsid << dendl;
int r = sanitize_options();
if (r < 0) {
derr << "option sanitization failed!" << dendl;
- lock.Unlock();
return r;
}
if (r == -ENOENT)
r = write_fsid();
if (r < 0) {
- lock.Unlock();
return r;
}
}
<< "'mon_force_quorum_join' is set -- allowing boot" << dendl;
} else {
derr << "commit suicide!" << dendl;
- lock.Unlock();
return -ENOENT;
}
}
write_default_keyring(bl);
} else {
derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
- lock.Unlock();
return r;
}
}
AdminSocket* admin_socket = cct->get_admin_socket();
// unlock while registering to avoid mon_lock -> admin socket lock dependency.
- lock.Unlock();
+ l.unlock();
+
r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
"show current monitor status");
ceph_assert(r == 0);
"show recent slow ops");
ceph_assert(r == 0);
- lock.Lock();
+ l.lock();
// add ourselves as a conf observer
g_conf().add_observer(this);
auth_registry.refresh_config();
- lock.Unlock();
return 0;
}
{
dout(1) << "shutdown" << dendl;
- lock.Lock();
+ lock.lock();
wait_for_paxos_write();
state = STATE_SHUTDOWN;
- lock.Unlock();
+ lock.unlock();
g_conf().remove_observer(this);
- lock.Lock();
+ lock.lock();
if (admin_hook) {
cct->get_admin_socket()->unregister_commands(admin_hook);
mgr_client.shutdown();
- lock.Unlock();
+ lock.unlock();
finisher.wait_for_empty();
finisher.stop();
- lock.Lock();
+ lock.lock();
// clean up
paxos->shutdown();
log_client.shutdown();
// unlock before msgr shutdown...
- lock.Unlock();
+ lock.unlock();
// shutdown messenger before removing logger from perfcounter collection,
// otherwise _ms_dispatch() will try to update deleted logger
{
if (paxos->is_writing() || paxos->is_writing_previous()) {
dout(10) << __func__ << " flushing pending write" << dendl;
- lock.Unlock();
+ lock.unlock();
store->flush();
- lock.Lock();
+ lock.lock();
dout(10) << __func__ << " flushed pending write" << dendl;
}
}
dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
[this](int r){
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
do_health_to_clog_interval();
}));
}
}
-utime_t Monitor::health_interval_calc_next_update()
+ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
{
- utime_t now = ceph_clock_now();
+ auto now = ceph::real_clock::now();
- time_t secs = now.sec();
- int remainder = secs % cct->_conf->mon_health_to_clog_interval;
+ auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
+ int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
- utime_t next = utime_t(secs + adjustment, 0);
+ auto next = secs + std::chrono::seconds(adjustment);
dout(20) << __func__
<< " now: " << now << ","
<< " interval: " << cct->_conf->mon_health_to_clog_interval
<< dendl;
- return next;
+ return ceph::real_clock::time_point{next};
}
void Monitor::health_interval_start()
}
health_interval_stop();
- utime_t next = health_interval_calc_next_update();
+ auto next = health_interval_calc_next_update();
health_interval_event = new C_MonContext(this, [this](int r) {
if (r < 0)
return;
int rank;
Messenger *messenger;
ConnectionRef con_self;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("Monitor::lock");
SafeTimer timer;
Finisher finisher;
ThreadPool cpu_tp; ///< threadpool for CPU intensive work
// -- sessions --
MonSessionMap session_map;
- Mutex session_map_lock{"Monitor::session_map_lock"};
+ ceph::mutex session_map_lock = ceph::make_mutex("Monitor::session_map_lock");
AdminSocketHook *admin_hook;
template<typename Func, typename...Args>
void health_tick_start();
void health_tick_stop();
- utime_t health_interval_calc_next_update();
+ ceph::real_clock::time_point health_interval_calc_next_update();
void health_interval_start();
void health_interval_stop();
void health_events_cleanup();
//on forwarded messages, so we create a non-locking version for this class
void _ms_dispatch(Message *m);
bool ms_dispatch(Message *m) override {
- lock.Lock();
+ std::lock_guard l{lock};
_ms_dispatch(m);
- lock.Unlock();
return true;
}
void dispatch_op(MonOpRequestRef op);
OSDMap::Incremental& pending_inc;
// lock to protect pending_inc form changing
// when checking is done
- Mutex pending_inc_lock = {"CleanUpmapJob::pending_inc_lock"};
+ ceph::mutex pending_inc_lock =
+ ceph::make_mutex("CleanUpmapJob::pending_inc_lock");
CleanUpmapJob(CephContext *cct, const OSDMap& om, OSDMap::Incremental& pi)
: ParallelPGMapper::Job(&om),
void check_osdmap_subs();
void share_map_with_random_osd();
- Mutex prime_pg_temp_lock = {"OSDMonitor::prime_pg_temp_lock"};
+ ceph::mutex prime_pg_temp_lock =
+ ceph::make_mutex("OSDMonitor::prime_pg_temp_lock");
struct PrimeTempJob : public ParallelPGMapper::Job {
OSDMonitor *osdmon;
PrimeTempJob(const OSDMap& om, OSDMonitor *m)
// set state.
state = STATE_UPDATING;
- lease_expire = utime_t(); // cancel lease
+ lease_expire = {}; // cancel lease
// yes.
version_t v = last_committed+1;
ceph_assert(commits_started > 0);
--commits_started;
if (commits_started == 0)
- shutdown_cond.Signal();
+ shutdown_cond.notify_all();
}
void Paxos::commit_start()
// cancel lease - it was for the old value.
// (this would only happen if message layer lost the 'begin', but
// leader still got a majority and committed with out us.)
- lease_expire = utime_t(); // cancel lease
+ lease_expire = {}; // cancel lease
last_committed++;
last_commit_time = ceph_clock_now();
ceph_assert(mon->is_leader());
//assert(is_active());
- lease_expire = ceph_clock_now();
- lease_expire += g_conf()->mon_lease;
+ lease_expire = ceph::real_clock::now();
+ lease_expire += ceph::make_timespan(g_conf()->mon_lease);
acked_lease.clear();
acked_lease.insert(mon->rank);
MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
ceph_clock_now());
lease->last_committed = last_committed;
- lease->lease_timestamp = lease_expire;
+ lease->lease_timestamp = utime_t{lease_expire};
lease->first_committed = first_committed;
mon->send_mon_message(lease, *p);
}
}
// set renew event
- utime_t at = lease_expire;
- at -= g_conf()->mon_lease;
- at += g_conf()->mon_lease_renew_interval_factor * g_conf()->mon_lease;
+ auto at = lease_expire;
+ at -= ceph::make_timespan(g_conf()->mon_lease);
+ at += ceph::make_timespan(g_conf()->mon_lease_renew_interval_factor *
+ g_conf()->mon_lease);
lease_renew_event = mon->timer.add_event_at(
at, new C_MonContext(mon, [this](int r) {
if (r == -ECANCELED)
warn_on_future_time(lease->sent_timestamp, lease->get_source());
// extend lease
- if (lease_expire < lease->lease_timestamp) {
- lease_expire = lease->lease_timestamp;
+ if (auto new_expire = lease->lease_timestamp.to_real_time();
+ lease_expire < new_expire) {
+ lease_expire = new_expire;
- utime_t now = ceph_clock_now();
+ auto now = ceph::real_clock::now();
if (lease_expire < now) {
- utime_t diff = now - lease_expire;
+ auto diff = now - lease_expire;
derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
}
}
// Let store finish commits in progress
// XXX: I assume I can't use finish_contexts() because the store
// is going to trigger
- while(commits_started > 0)
- shutdown_cond.Wait(mon->lock);
+ unique_lock l{mon->lock, std::adopt_lock};
+ shutdown_cond.wait(l, [this] { return commits_started <= 0; });
+ // Monitor::shutdown() will unlock it
+ l.release();
finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
}
state = STATE_RECOVERING;
- lease_expire = utime_t();
+ lease_expire = {};
dout(10) << "leader_init -- starting paxos recovery" << dendl;
collect(0);
}
new_value.clear();
state = STATE_RECOVERING;
- lease_expire = utime_t();
+ lease_expire = {};
dout(10) << "peon_init -- i am a peon" << dendl;
// start a timer, in case the leader never manages to issue a lease
if (is_writing() || is_writing_previous()) {
dout(10) << __func__ << " flushing" << dendl;
- mon->lock.Unlock();
+ mon->lock.unlock();
mon->store->flush();
- mon->lock.Lock();
+ mon->lock.lock();
dout(10) << __func__ << " flushed" << dendl;
}
state = STATE_RECOVERING;
bool Paxos::is_lease_valid()
{
return ((mon->get_quorum().size() == 1)
- || (ceph_clock_now() < lease_expire));
+ || (ceph::real_clock::now() < lease_expire));
}
// -- WRITE --
*/
int commits_started = 0;
- Cond shutdown_cond;
+ ceph::condition_variable shutdown_cond;
public:
/**
* keep leases. Each lease will have an expiration date, which may or may
* not be extended.
*/
- utime_t lease_expire;
+ ceph::real_clock::time_point lease_expire;
/**
* List of callbacks waiting for our state to change into STATE_ACTIVE.
*/