#include "common/cmdparse.h"
#include "common/LogEntry.h"
-#include "common/Mutex.h"
#include "common/Thread.h"
#include "mon/health_check.h"
#include "mgr/Gil.h"
monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
client(client_), finisher(f),
cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
- server(server), py_module_registry(pmr), lock("ActivePyModules")
+ server(server), py_module_registry(pmr)
{
store_cache = std::move(store_data);
cmd_finisher.start();
auto module = i.second.get();
const auto& name = i.first;
- lock.Unlock();
+ lock.unlock();
dout(10) << "calling module " << name << " shutdown()" << dendl;
module->shutdown();
dout(10) << "module " << name << " shutdown() returned" << dendl;
- lock.Lock();
+ lock.lock();
}
// For modules implementing serve(), finish the threads where we
// were running that.
for (auto &i : modules) {
- lock.Unlock();
+ lock.unlock();
dout(10) << "joining module " << i.first << dendl;
i.second->thread.join();
dout(10) << "joined module " << i.first << dendl;
- lock.Lock();
+ lock.lock();
}
cmd_finisher.wait_for_empty();
{
bool changed = false;
- lock.Lock();
+ lock.lock();
auto p = modules.find(module_name);
if (p != modules.end()) {
changed = p->second->set_health_checks(std::move(checks));
}
- lock.Unlock();
+ lock.unlock();
// immediately schedule a report to be sent to the monitors with the new
// health checks that have changed. This is done asynchronusly to avoid
std::stringstream *ds,
std::stringstream *ss)
{
- lock.Lock();
+ lock.lock();
auto mod_iter = modules.find(module_name);
if (mod_iter == modules.end()) {
*ss << "Module '" << module_name << "' is not available";
- lock.Unlock();
+ lock.unlock();
return -ENOENT;
}
- lock.Unlock();
+ lock.unlock();
return mod_iter->second->handle_command(cmdmap, inbuf, ds, ss);
}
#include "ActivePyModule.h"
#include "common/Finisher.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "PyFormatter.h"
map<std::string,ProgressEvent> progress_events;
- mutable Mutex lock{"ActivePyModules::lock"};
+ mutable ceph::mutex lock = ceph::make_mutex("ActivePyModules::lock");
public:
ActivePyModules(PyModuleConfig &module_config,
const MgrMap& mgrmap)
: monc(monc_),
objecter(objecter_),
- lock("ClusterState"),
mgr_map(mgrmap)
{}
#include "mds/FSMap.h"
#include "mon/MgrMap.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "osdc/Objecter.h"
#include "mon/MonClient.h"
Objecter *objecter;
FSMap fsmap;
ServiceMap servicemap;
- mutable Mutex lock;
+ mutable ceph::mutex lock = ceph::make_mutex("ClusterState");
MgrMap mgr_map;
py_modules(py_modules_),
clog(clog_),
audit_clog(audit_clog_),
- lock("DaemonServer"),
pgmap_ready(false),
timer(g_ceph_context, lock),
shutting_down(false),
// fire after all modules have had a chance to set their health checks.
void DaemonServer::schedule_tick_locked(double delay_sec)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
if (tick_event) {
timer.cancel_event(tick_event);
void DaemonServer::_send_configure(ConnectionRef c)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
auto configure = make_message<MMgrConfigure>();
configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
#include <set>
#include <string>
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/LogClient.h"
#include "common/Timer.h"
epoch_t pending_service_map_dirty = 0;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("DaemonServer");
static void _generate_command_map(cmdmap_t& cmdmap,
map<string,string> ¶m_str_map);
void DaemonStateIndex::_erase(const DaemonKey& dmk)
{
- ceph_assert(lock.is_wlocked());
+ ceph_assert(ceph_mutex_is_wlocked(lock));
const auto to_erase = all.find(dmk);
ceph_assert(to_erase != all.end());
objecter(objecter_),
client(client_),
client_messenger(clientm_),
- lock("Mgr::lock"),
finisher(g_ceph_context, "Mgr", "mgr-fin"),
digest_received(false),
py_module_registry(py_module_registry_),
std::map<std::string, std::string> Mgr::load_store()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
dout(10) << "listing keys" << dendl;
JSONCommand cmd;
cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
- lock.Unlock();
+ lock.unlock();
cmd.wait();
- lock.Lock();
+ lock.lock();
ceph_assert(cmd.r == 0);
std::map<std::string, std::string> loaded;
std::ostringstream cmd_json;
cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
get_cmd.run(monc, cmd_json.str());
- lock.Unlock();
+ lock.unlock();
get_cmd.wait();
- lock.Lock();
+ lock.lock();
if (get_cmd.r == 0) { // tolerate racing config-key change
if (key.substr(0, device_prefix.size()) == device_prefix) {
// device/
void Mgr::init()
{
- std::lock_guard l(lock);
+ std::unique_lock l(lock);
ceph_assert(initializing);
ceph_assert(!initialized);
monc->reopen_session();
// Start Objecter and wait for OSD map
- lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
+ lock.unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
objecter->wait_for_osd_map();
- lock.Lock();
+ lock.lock();
// Populate PGs in ClusterState
cluster_state.with_osdmap_and_pgmap([this](const OSDMap &osd_map,
// Wait for FSMap
dout(4) << "waiting for FSMap..." << dendl;
- while (!cluster_state.have_fsmap()) {
- fs_map_cond.Wait(lock);
- }
+ fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();});
dout(4) << "waiting for config-keys..." << dendl;
// Wait for MgrDigest...
dout(4) << "waiting for MgrDigest..." << dendl;
- while (!digest_received) {
- digest_cond.Wait(lock);
- }
+ digest_cond.wait(l, [this] { return digest_received; });
// Load module KV store
auto kv_store = load_store();
// Migrate config from KV store on luminous->mimic
// drop lock because we do blocking config sets to mon
- lock.Unlock();
+ lock.unlock();
py_module_registry->upgrade_config(monc, kv_store);
- lock.Lock();
+ lock.lock();
// assume finisher already initialized in background_init
dout(4) << "starting python modules..." << dendl;
void Mgr::load_all_metadata()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
JSONCommand mds_cmd;
mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
JSONCommand mon_cmd;
mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
- lock.Unlock();
+ lock.unlock();
mds_cmd.wait();
osd_cmd.wait();
mon_cmd.wait();
- lock.Lock();
+ lock.lock();
ceph_assert(mds_cmd.r == 0);
ceph_assert(mon_cmd.r == 0);
void Mgr::handle_osd_map()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
std::set<std::string> names_exist;
void Mgr::handle_mon_map()
{
dout(20) << __func__ << dendl;
- assert(lock.is_locked_by_me());
+ assert(ceph_mutex_is_locked_by_me(lock));
std::set<std::string> names_exist;
cluster_state.with_monmap([&] (auto &monmap) {
for (unsigned int i = 0; i < monmap.size(); i++) {
void Mgr::handle_fs_map(ref_t<MFSMap> m)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
std::set<std::string> names_exist;
const FSMap &new_fsmap = m->get_fsmap();
- fs_map_cond.Signal();
+ fs_map_cond.notify_all();
// TODO: callers (e.g. from python land) are potentially going to see
// the new fsmap before we've bothered populating all the resulting
if (!digest_received) {
digest_received = true;
- digest_cond.Signal();
+ digest_cond.notify_all();
}
}
Client *client;
Messenger *client_messenger;
- mutable Mutex lock;
+ mutable ceph::mutex lock = ceph::make_mutex("Mgr::lock");
Finisher finisher;
// Track receipt of initial data during startup
- Cond fs_map_cond;
+ ceph::condition_variable fs_map_cond;
bool digest_received;
- Cond digest_cond;
+ ceph::condition_variable digest_cond;
PyModuleRegistry *py_module_registry;
DaemonStateIndex daemon_state;
void MgrClient::shutdown()
{
- std::lock_guard l(lock);
+ std::unique_lock l(lock);
ldout(cct, 10) << dendl;
if (connect_retry_callback) {
m->daemon_name = daemon_name;
m->service_name = service_name;
session->con->send_message2(m);
- utime_t timeout;
- timeout.set_from_double(cct->_conf.get_val<double>(
+ auto timeout = ceph::make_timespan(cct->_conf.get_val<double>(
"mgr_client_service_daemon_unregister_timeout"));
- shutdown_cond.WaitInterval(lock, timeout);
+ shutdown_cond.wait_for(l, timeout);
}
timer.shutdown();
void MgrClient::reconnect()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
if (session) {
ldout(cct, 4) << "Terminating session with "
return;
}
- if (last_connect_attempt != utime_t()) {
- utime_t now = ceph_clock_now();
- utime_t when = last_connect_attempt;
- when += cct->_conf.get_val<double>("mgr_connect_retry_interval");
+ if (!clock_t::is_zero(last_connect_attempt)) {
+ auto now = clock_t::now();
+ auto when = last_connect_attempt +
+ ceph::make_timespan(
+ cct->_conf.get_val<double>("mgr_connect_retry_interval"));
if (now < when) {
if (!connect_retry_callback) {
connect_retry_callback = timer.add_event_at(
ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
<< dendl;
- last_connect_attempt = ceph_clock_now();
+ last_connect_attempt = clock_t::now();
session.reset(new MgrSessionState());
session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
ldout(cct, 20) << *m << dendl;
void MgrClient::_send_report()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
ceph_assert(session);
report_callback = nullptr;
bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
ldout(cct, 20) << *m << dendl;
bool MgrClient::handle_mgr_close(ref_t<MMgrClose> m)
{
service_daemon = false;
- shutdown_cond.Signal();
+ shutdown_cond.notify_all();
return true;
}
bool MgrClient::handle_command_reply(ref_t<MCommandReply> m)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
ldout(cct, 20) << *m << dendl;
std::unique_ptr<MgrSessionState> session;
- Mutex lock = {"MgrClient::lock"};
- Cond shutdown_cond;
+ ceph::mutex lock = ceph::make_mutex("MgrClient::lock");
+ ceph::condition_variable shutdown_cond;
uint32_t stats_period = 0;
uint32_t stats_threshold = 0;
CommandTable<MgrCommand> command_table;
- utime_t last_connect_attempt;
+ using clock_t = ceph::real_clock;
+ clock_t::time_point last_connect_attempt;
uint64_t last_config_bl_version = 0;
log_client(g_ceph_context, client_messenger.get(), &monc.monmap, LogClient::NO_FLAGS),
clog(log_client.create_channel(CLOG_CHANNEL_CLUSTER)),
audit_clog(log_client.create_channel(CLOG_CHANNEL_AUDIT)),
- lock("MgrStandby::lock"),
finisher(g_ceph_context, "MgrStandby", "mgrsb-fin"),
timer(g_ceph_context, lock),
py_module_registry(clog),
void MgrStandby::send_beacon()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
dout(20) << state_str() << dendl;
std::list<PyModuleRef> modules = py_module_registry.get_modules();
bool handled = false;
if (active_mgr) {
auto am = active_mgr;
- lock.Unlock();
+ lock.unlock();
handled = am->ms_dispatch2(m);
- lock.Lock();
+ lock.lock();
}
if (m->get_type() == MSG_MGR_MAP) {
// let this pass through for mgrc
LogClient log_client;
LogChannelRef clog, audit_clog;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("MgrStandby::lock");
Finisher finisher;
SafeTimer timer;
} // anonymous namespace
OSDPerfMetricCollector::OSDPerfMetricCollector(Listener &listener)
- : listener(listener), lock("OSDPerfMetricCollector::lock") {
+ : listener(listener) {
}
std::map<OSDPerfMetricQuery, OSDPerfMetricLimits>
#ifndef OSD_PERF_METRIC_COLLECTOR_H_
#define OSD_PERF_METRIC_COLLECTOR_H_
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "mgr/OSDPerfMetricTypes.h"
std::map<OSDPerfMetricKey, PerformanceCounters>> Counters;
Listener &listener;
- mutable Mutex lock;
+ mutable ceph::mutex lock = ceph::make_mutex("OSDPerfMetricCollector::lock");
OSDPerfMetricQueryID next_query_id = 0;
Queries queries;
Counters counters;
#include <string>
#include <vector>
#include <boost/optional.hpp>
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "Python.h"
#include "Gil.h"
#include "mon/MgrMap.h"
class PyModule
{
- mutable Mutex lock{"PyModule::lock"};
+ mutable ceph::mutex lock = ceph::make_mutex("PyModule::lock");
private:
const std::string module_name;
std::string get_site_packages();
class PyModuleConfig {
public:
- mutable Mutex lock{"PyModuleConfig::lock"};
+ mutable ceph::mutex lock = ceph::make_mutex("PyModuleConfig::lock");
std::map<std::string, std::string> config;
PyModuleConfig();
class PyModuleRegistry
{
private:
- mutable Mutex lock{"PyModuleRegistry::lock"};
+ mutable ceph::mutex lock = ceph::make_mutex("PyModuleRegistry::lock");
LogChannelRef clog;
std::map<std::string, PyModuleRef> modules;
auto module = i.second.get();
const auto& name = i.first;
dout(10) << "waiting for module " << name << " to shutdown" << dendl;
- lock.Unlock();
+ lock.unlock();
module->shutdown();
- lock.Lock();
+ lock.lock();
dout(10) << "module " << name << " shutdown" << dendl;
}
// For modules implementing serve(), finish the threads where we
// were running that.
for (auto &i : modules) {
- lock.Unlock();
+ lock.unlock();
dout(10) << "joining thread for module " << i.first << dendl;
i.second->thread.join();
dout(10) << "joined thread for module " << i.first << dendl;
- lock.Lock();
+ lock.lock();
}
modules.clear();
#include <map>
#include "common/Thread.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "mgr/Gil.h"
#include "mon/MonClient.h"
*/
class StandbyPyModuleState
{
- mutable Mutex lock{"StandbyPyModuleState::lock"};
+ mutable ceph::mutex lock = ceph::make_mutex("StandbyPyModuleState::lock");
MgrMap mgr_map;
PyModuleConfig &module_config;
class StandbyPyModules
{
private:
- mutable Mutex lock{"StandbyPyModules::lock"};
+ mutable ceph::mutex lock = ceph::make_mutex("StandbyPyModules::lock");
std::map<std::string, std::unique_ptr<StandbyPyModule>> modules;
StandbyPyModuleState state;