#undef dout_prefix
#define dout_prefix *_dout << "mgr " << __func__ << " "
-ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
- std::map<std::string, std::string> store_data,
- DaemonStateIndex &ds, ClusterState &cs,
- MonClient &mc, LogChannelRef clog_,
- LogChannelRef audit_clog_, Objecter &objecter_,
- Client &client_, Finisher &f, DaemonServer &server,
- PyModuleRegistry &pmr)
- : module_config(module_config_), daemon_state(ds), cluster_state(cs),
- monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
- client(client_), finisher(f),
- cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
- server(server), py_module_registry(pmr)
+ActivePyModules::ActivePyModules(
+ PyModuleConfig &module_config_,
+ std::map<std::string, std::string> store_data,
+ bool mon_provides_kv_sub,
+ DaemonStateIndex &ds, ClusterState &cs,
+ MonClient &mc, LogChannelRef clog_,
+ LogChannelRef audit_clog_, Objecter &objecter_,
+ Client &client_, Finisher &f, DaemonServer &server,
+ PyModuleRegistry &pmr)
+: module_config(module_config_), daemon_state(ds), cluster_state(cs),
+ monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
+ client(client_), finisher(f),
+ cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
+ server(server), py_module_registry(pmr)
{
store_cache = std::move(store_data);
cmd_finisher.start();
Command set_cmd;
{
std::lock_guard l(lock);
+
+ // NOTE: this isn't strictly necessary since we'll also get an MKVData
+ // update from the mon due to our subscription *before* our command is acked.
if (val) {
store_cache[global_key] = *val;
} else {
return result;
}
+void ActivePyModules::update_kv_data(
+ const std::string prefix,
+ bool incremental,
+ const map<std::string, boost::optional<bufferlist>, std::less<>>& data)
+{
+ std::lock_guard l(lock);
+ if (!incremental) {
+ dout(10) << "full update on " << prefix << dendl;
+ auto p = store_cache.lower_bound(prefix);
+ while (p != store_cache.end() && p->first.find(prefix) == 0) {
+ dout(20) << " rm prior " << p->first << dendl;
+ p = store_cache.erase(p);
+ }
+ } else {
+ dout(10) << "incremental update on " << prefix << dendl;
+ }
+ for (auto& i : data) {
+ if (i.second) {
+ dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+ store_cache[i.first] = i.second->to_str();
+ } else {
+ dout(20) << " rm " << i.first << dendl;
+ store_cache.erase(i.first);
+ }
+ }
+}
+
PyObject* ActivePyModules::with_perf_counters(
std::function<void(PerfCounterInstance& counter_instance, PerfCounterType& counter_type, PyFormatter& f)> fct,
const std::string &svc_name,
// module class instances already created
std::map<std::string, std::shared_ptr<ActivePyModule>> modules;
PyModuleConfig &module_config;
+ bool have_local_config_map = false;
std::map<std::string, std::string> store_cache;
DaemonStateIndex &daemon_state;
ClusterState &cluster_state;
mutable ceph::mutex lock = ceph::make_mutex("ActivePyModules::lock");
public:
- ActivePyModules(PyModuleConfig &module_config,
- std::map<std::string, std::string> store_data,
- DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
- LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
- Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
+ ActivePyModules(
+ PyModuleConfig &module_config,
+ std::map<std::string, std::string> store_data,
+ bool mon_provides_kv_sub,
+ DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
+ LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
+ Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
~ActivePyModules();
std::map<std::string, std::string> get_services() const;
+ void update_kv_data(
+ const std::string prefix,
+ bool incremental,
+ const map<std::string, boost::optional<bufferlist>, std::less<>>& data);
+
// Public so that MonCommandCompletion can use it
// FIXME: for send_command completion notifications,
// send it to only the module that sent the command, not everyone
#include "messages/MCommandReply.h"
#include "messages/MLog.h"
#include "messages/MServiceMap.h"
+#include "messages/MKVData.h"
#include "PyModule.h"
#include "Mgr.h"
dout(20) << "saw key '" << key << "'" << dendl;
- const std::string config_prefix = PyModule::config_prefix;
+ const std::string store_prefix = PyModule::config_prefix;
const std::string device_prefix = "device/";
- if (key.substr(0, config_prefix.size()) == config_prefix ||
- key.substr(0, device_prefix.size()) == device_prefix) {
+ if (key.substr(0, device_prefix.size()) == device_prefix ||
+ key.substr(0, store_prefix.size()) == store_prefix) {
dout(20) << "fetching '" << key << "'" << dendl;
Command get_cmd;
std::ostringstream cmd_json;
get_cmd.wait();
lock.lock();
if (get_cmd.r == 0) { // tolerate racing config-key change
- if (key.substr(0, device_prefix.size()) == device_prefix) {
- // device/
- string devid = key.substr(device_prefix.size());
- map<string,string> meta;
- ostringstream ss;
- string val = get_cmd.outbl.to_str();
- int r = get_json_str_map(val, ss, &meta, false);
- if (r < 0) {
- derr << __func__ << " failed to parse " << val << ": " << ss.str()
- << dendl;
- } else {
- daemon_state.with_device_create(
- devid, [&meta] (DeviceState& dev) {
- dev.set_metadata(std::move(meta));
- });
- }
- } else {
- // config/
- loaded[key] = get_cmd.outbl.to_str();
- }
+ loaded[key] = get_cmd.outbl.to_str();
}
}
}
register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
+ // Only pacific+ monitors support subscribe to kv updates
+ bool mon_allows_kv_sub = false;
+ monc->with_monmap(
+ [&](const MonMap &monmap) {
+ if (monmap.get_required_features().contains_all(
+ ceph::features::mon::FEATURE_PACIFIC)) {
+ mon_allows_kv_sub = true;
+ }
+ });
+
// subscribe to all the maps
monc->sub_want("log-info", 0, 0);
monc->sub_want("mgrdigest", 0, 0);
monc->sub_want("fsmap", 0, 0);
monc->sub_want("servicemap", 0, 0);
+ if (mon_allows_kv_sub) {
+ monc->sub_want("kv:config/", 0, 0);
+ monc->sub_want("kv:mgr/", 0, 0);
+ monc->sub_want("kv:device/", 0, 0);
+ }
dout(4) << "waiting for OSDMap..." << dendl;
// Subscribe to OSDMap update to pass on to ClusterState
dout(4) << "waiting for FSMap..." << dendl;
fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();});
- dout(4) << "waiting for config-keys..." << dendl;
-
// Wait for MgrDigest...
dout(4) << "waiting for MgrDigest..." << dendl;
digest_cond.wait(l, [this] { return digest_received; });
- // Load module KV store
- auto kv_store = load_store();
+ if (!mon_allows_kv_sub) {
+ dout(4) << "loading config-key data from pre-pacific mon cluster..." << dendl;
+ pre_init_store = load_store();
+ }
+ dout(4) << "initializing device state..." << dendl;
+ // Note: we only have to do this during startup because once we are
+ // active the only changes to this state will originate from one of our
+ // own modules.
+ for (auto p = pre_init_store.lower_bound("device/");
+ p != pre_init_store.end() && p->first.find("device/") == 0;
+ ++p) {
+ string devid = p->first.substr(7);
+ dout(10) << " updating " << devid << dendl;
+ map<string,string> meta;
+ ostringstream ss;
+ int r = get_json_str_map(p->second, ss, &meta, false);
+ if (r < 0) {
+ derr << __func__ << " failed to parse " << p->second << ": " << ss.str()
+ << dendl;
+ } else {
+ daemon_state.with_device_create(
+ devid, [&meta] (DeviceState& dev) {
+ dev.set_metadata(std::move(meta));
+ });
+ }
+ }
+
// assume finisher already initialized in background_init
dout(4) << "starting python modules..." << dendl;
- py_module_registry->active_start(daemon_state, cluster_state,
- kv_store, *monc, clog, audit_clog, *objecter, *client,
- finisher, server);
+ py_module_registry->active_start(
+ daemon_state, cluster_state,
+ pre_init_store, mon_allows_kv_sub,
+ *monc, clog, audit_clog, *objecter, *client,
+ finisher, server);
cluster_state.final_init();
case MSG_LOG:
handle_log(ref_cast<MLog>(m));
break;
+ case MSG_KV_DATA:
+ {
+ auto msg = ref_cast<MKVData>(m);
+ monc->sub_got("kv:"s + msg->prefix, msg->version);
+ if (!msg->data.empty()) {
+ if (initialized) {
+ py_module_registry->update_kv_data(
+ msg->prefix,
+ msg->incremental,
+ msg->data
+ );
+ } else {
+ // before we have created the ActivePyModules, we need to
+ // track the store regions we're monitoring
+ if (!msg->incremental) {
+ dout(10) << "full update on " << msg->prefix << dendl;
+ auto p = pre_init_store.lower_bound(msg->prefix);
+ while (p != pre_init_store.end() && p->first.find(msg->prefix) == 0) {
+ dout(20) << " rm prior " << p->first << dendl;
+ p = pre_init_store.erase(p);
+ }
+ } else {
+ dout(10) << "incremental update on " << msg->prefix << dendl;
+ }
+ for (auto& i : msg->data) {
+ if (i.second) {
+ dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+ pre_init_store[i.first] = i.second->to_str();
+ } else {
+ dout(20) << " rm " << i.first << dendl;
+ pre_init_store.erase(i.first);
+ }
+ }
+ }
+ }
+ }
+ break;
default:
return false;
LogChannelRef clog;
LogChannelRef audit_clog;
+ std::map<std::string, std::string> pre_init_store;
+
void load_all_metadata();
std::map<std::string, std::string> load_store();
void init();
void PyModuleRegistry::active_start(
DaemonStateIndex &ds, ClusterState &cs,
const std::map<std::string, std::string> &kv_store,
+ bool mon_provides_kv_sub,
MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
Objecter &objecter_, Client &client_, Finisher &f,
DaemonServer &server)
standby_modules.reset();
}
- active_modules.reset(new ActivePyModules(
- module_config, kv_store, ds, cs, mc,
- clog_, audit_clog_, objecter_, client_, f, server,
- *this));
+ active_modules.reset(
+ new ActivePyModules(
+ module_config,
+ kv_store, mon_provides_kv_sub,
+ ds, cs, mc,
+ clog_, audit_clog_, objecter_, client_, f, server,
+ *this));
for (const auto &i : modules) {
// Anything we're skipping because of !can_run will be flagged
void handle_config(const std::string &k, const std::string &v);
void handle_config_notify();
+ void update_kv_data(
+ const std::string prefix,
+ bool incremental,
+ const map<std::string, boost::optional<bufferlist>, std::less<>>& data) {
+ ceph_assert(active_modules);
+ active_modules->update_kv_data(prefix, incremental, data);
+ }
+
/**
* Get references to all modules (whether they have loaded and/or
* errored) or not.
void active_start(
DaemonStateIndex &ds, ClusterState &cs,
const std::map<std::string, std::string> &kv_store,
+ bool mon_provides_kv_sub,
MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
Objecter &objecter_, Client &client_, Finisher &f,
DaemonServer &server);