Include the config/ prefix (which we weren't loading before).
Before we are active, we collect these changes, and then pass them to
the ActivePyModules ctor. No change in functionality here, except that
when we make a change from a mgr module, we'll (1) put it in our local
cache store, (2) send the mon command to set it, and (3) get a notification
that updates the same value. Since this whole process is synchronous (see
ActivePyModules::set_store()), and the notification will generally arrive
*before* the command ack, there is no change in behavior.
If the mon cluster is not yet pacific, we still need to load
kv values the old way. If it's a mixed-mon cluster (and, e.g., our
current mon has the feature but not all of them do), we'll get this data
both ways, but no harm is done.
Signed-off-by: Sage Weil <sage@newdream.net>
#undef dout_prefix
#define dout_prefix *_dout << "mgr " << __func__ << " "
-ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
- std::map<std::string, std::string> store_data,
- DaemonStateIndex &ds, ClusterState &cs,
- MonClient &mc, LogChannelRef clog_,
- LogChannelRef audit_clog_, Objecter &objecter_,
- Client &client_, Finisher &f, DaemonServer &server,
- PyModuleRegistry &pmr)
- : module_config(module_config_), daemon_state(ds), cluster_state(cs),
- monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
- client(client_), finisher(f),
- cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
- server(server), py_module_registry(pmr)
+ActivePyModules::ActivePyModules(
+ PyModuleConfig &module_config_,
+ std::map<std::string, std::string> store_data,
+ bool mon_provides_kv_sub,
+ DaemonStateIndex &ds, ClusterState &cs,
+ MonClient &mc, LogChannelRef clog_,
+ LogChannelRef audit_clog_, Objecter &objecter_,
+ Client &client_, Finisher &f, DaemonServer &server,
+ PyModuleRegistry &pmr)
+: module_config(module_config_), daemon_state(ds), cluster_state(cs),
+ monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
+ client(client_), finisher(f),
+ cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
+ server(server), py_module_registry(pmr)
{
store_cache = std::move(store_data);
cmd_finisher.start();
Command set_cmd;
{
std::lock_guard l(lock);
+
+ // NOTE: this isn't strictly necessary since we'll also get an MKVData
+ // update from the mon due to our subscription *before* our command is acked.
if (val) {
store_cache[global_key] = *val;
} else {
return result;
}
+void ActivePyModules::update_kv_data(
+ const std::string prefix,
+ bool incremental,
+ const map<std::string, boost::optional<bufferlist>, std::less<>>& data)
+{
+ std::lock_guard l(lock);
+ if (!incremental) {
+ dout(10) << "full update on " << prefix << dendl;
+ auto p = store_cache.lower_bound(prefix);
+ while (p != store_cache.end() && p->first.find(prefix) == 0) {
+ dout(20) << " rm prior " << p->first << dendl;
+ p = store_cache.erase(p);
+ }
+ } else {
+ dout(10) << "incremental update on " << prefix << dendl;
+ }
+ for (auto& i : data) {
+ if (i.second) {
+ dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+ store_cache[i.first] = i.second->to_str();
+ } else {
+ dout(20) << " rm " << i.first << dendl;
+ store_cache.erase(i.first);
+ }
+ }
+}
+
PyObject* ActivePyModules::with_perf_counters(
std::function<void(PerfCounterInstance& counter_instance, PerfCounterType& counter_type, PyFormatter& f)> fct,
const std::string &svc_name,
// module class instances already created
std::map<std::string, std::shared_ptr<ActivePyModule>> modules;
PyModuleConfig &module_config;
+ bool have_local_config_map = false;
std::map<std::string, std::string> store_cache;
DaemonStateIndex &daemon_state;
ClusterState &cluster_state;
mutable ceph::mutex lock = ceph::make_mutex("ActivePyModules::lock");
public:
- ActivePyModules(PyModuleConfig &module_config,
- std::map<std::string, std::string> store_data,
- DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
- LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
- Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
+ ActivePyModules(
+ PyModuleConfig &module_config,
+ std::map<std::string, std::string> store_data,
+ bool mon_provides_kv_sub,
+ DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
+ LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
+ Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
~ActivePyModules();
std::map<std::string, std::string> get_services() const;
+ void update_kv_data(
+ const std::string prefix,
+ bool incremental,
+ const map<std::string, boost::optional<bufferlist>, std::less<>>& data);
+
// Public so that MonCommandCompletion can use it
// FIXME: for send_command completion notifications,
// send it to only the module that sent the command, not everyone
#include "messages/MCommandReply.h"
#include "messages/MLog.h"
#include "messages/MServiceMap.h"
+#include "messages/MKVData.h"
#include "PyModule.h"
#include "Mgr.h"
dout(20) << "saw key '" << key << "'" << dendl;
- const std::string config_prefix = PyModule::config_prefix;
+ const std::string store_prefix = PyModule::config_prefix;
const std::string device_prefix = "device/";
- if (key.substr(0, config_prefix.size()) == config_prefix ||
- key.substr(0, device_prefix.size()) == device_prefix) {
+ if (key.substr(0, device_prefix.size()) == device_prefix ||
+ key.substr(0, store_prefix.size()) == store_prefix) {
dout(20) << "fetching '" << key << "'" << dendl;
Command get_cmd;
std::ostringstream cmd_json;
get_cmd.wait();
lock.lock();
if (get_cmd.r == 0) { // tolerate racing config-key change
- if (key.substr(0, device_prefix.size()) == device_prefix) {
- // device/
- string devid = key.substr(device_prefix.size());
- map<string,string> meta;
- ostringstream ss;
- string val = get_cmd.outbl.to_str();
- int r = get_json_str_map(val, ss, &meta, false);
- if (r < 0) {
- derr << __func__ << " failed to parse " << val << ": " << ss.str()
- << dendl;
- } else {
- daemon_state.with_device_create(
- devid, [&meta] (DeviceState& dev) {
- dev.set_metadata(std::move(meta));
- });
- }
- } else {
- // config/
- loaded[key] = get_cmd.outbl.to_str();
- }
+ loaded[key] = get_cmd.outbl.to_str();
}
}
}
register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
+ // Only pacific+ monitors support subscribe to kv updates
+ bool mon_allows_kv_sub = false;
+ monc->with_monmap(
+ [&](const MonMap &monmap) {
+ if (monmap.get_required_features().contains_all(
+ ceph::features::mon::FEATURE_PACIFIC)) {
+ mon_allows_kv_sub = true;
+ }
+ });
+
// subscribe to all the maps
monc->sub_want("log-info", 0, 0);
monc->sub_want("mgrdigest", 0, 0);
monc->sub_want("fsmap", 0, 0);
monc->sub_want("servicemap", 0, 0);
+ if (mon_allows_kv_sub) {
+ monc->sub_want("kv:config/", 0, 0);
+ monc->sub_want("kv:mgr/", 0, 0);
+ monc->sub_want("kv:device/", 0, 0);
+ }
dout(4) << "waiting for OSDMap..." << dendl;
// Subscribe to OSDMap update to pass on to ClusterState
dout(4) << "waiting for FSMap..." << dendl;
fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();});
- dout(4) << "waiting for config-keys..." << dendl;
-
// Wait for MgrDigest...
dout(4) << "waiting for MgrDigest..." << dendl;
digest_cond.wait(l, [this] { return digest_received; });
- // Load module KV store
- auto kv_store = load_store();
+ if (!mon_allows_kv_sub) {
+ dout(4) << "loading config-key data from pre-pacific mon cluster..." << dendl;
+ pre_init_store = load_store();
+ }
+ dout(4) << "initializing device state..." << dendl;
+ // Note: we only have to do this during startup because once we are
+ // active the only changes to this state will originate from one of our
+ // own modules.
+ for (auto p = pre_init_store.lower_bound("device/");
+ p != pre_init_store.end() && p->first.find("device/") == 0;
+ ++p) {
+ string devid = p->first.substr(7);
+ dout(10) << " updating " << devid << dendl;
+ map<string,string> meta;
+ ostringstream ss;
+ int r = get_json_str_map(p->second, ss, &meta, false);
+ if (r < 0) {
+ derr << __func__ << " failed to parse " << p->second << ": " << ss.str()
+ << dendl;
+ } else {
+ daemon_state.with_device_create(
+ devid, [&meta] (DeviceState& dev) {
+ dev.set_metadata(std::move(meta));
+ });
+ }
+ }
+
// assume finisher already initialized in background_init
dout(4) << "starting python modules..." << dendl;
- py_module_registry->active_start(daemon_state, cluster_state,
- kv_store, *monc, clog, audit_clog, *objecter, *client,
- finisher, server);
+ py_module_registry->active_start(
+ daemon_state, cluster_state,
+ pre_init_store, mon_allows_kv_sub,
+ *monc, clog, audit_clog, *objecter, *client,
+ finisher, server);
cluster_state.final_init();
case MSG_LOG:
handle_log(ref_cast<MLog>(m));
break;
+ case MSG_KV_DATA:
+ {
+ auto msg = ref_cast<MKVData>(m);
+ monc->sub_got("kv:"s + msg->prefix, msg->version);
+ if (!msg->data.empty()) {
+ if (initialized) {
+ py_module_registry->update_kv_data(
+ msg->prefix,
+ msg->incremental,
+ msg->data
+ );
+ } else {
+ // before we have created the ActivePyModules, we need to
+ // track the store regions we're monitoring
+ if (!msg->incremental) {
+ dout(10) << "full update on " << msg->prefix << dendl;
+ auto p = pre_init_store.lower_bound(msg->prefix);
+ while (p != pre_init_store.end() && p->first.find(msg->prefix) == 0) {
+ dout(20) << " rm prior " << p->first << dendl;
+ p = pre_init_store.erase(p);
+ }
+ } else {
+ dout(10) << "incremental update on " << msg->prefix << dendl;
+ }
+ for (auto& i : msg->data) {
+ if (i.second) {
+ dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+ pre_init_store[i.first] = i.second->to_str();
+ } else {
+ dout(20) << " rm " << i.first << dendl;
+ pre_init_store.erase(i.first);
+ }
+ }
+ }
+ }
+ }
+ break;
default:
return false;
LogChannelRef clog;
LogChannelRef audit_clog;
+ std::map<std::string, std::string> pre_init_store;
+
void load_all_metadata();
std::map<std::string, std::string> load_store();
void init();
void PyModuleRegistry::active_start(
DaemonStateIndex &ds, ClusterState &cs,
const std::map<std::string, std::string> &kv_store,
+ bool mon_provides_kv_sub,
MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
Objecter &objecter_, Client &client_, Finisher &f,
DaemonServer &server)
standby_modules.reset();
}
- active_modules.reset(new ActivePyModules(
- module_config, kv_store, ds, cs, mc,
- clog_, audit_clog_, objecter_, client_, f, server,
- *this));
+ active_modules.reset(
+ new ActivePyModules(
+ module_config,
+ kv_store, mon_provides_kv_sub,
+ ds, cs, mc,
+ clog_, audit_clog_, objecter_, client_, f, server,
+ *this));
for (const auto &i : modules) {
// Anything we're skipping because of !can_run will be flagged
void handle_config(const std::string &k, const std::string &v);
void handle_config_notify();
+ void update_kv_data(
+ const std::string prefix,
+ bool incremental,
+ const map<std::string, boost::optional<bufferlist>, std::less<>>& data) {
+ ceph_assert(active_modules);
+ active_modules->update_kv_data(prefix, incremental, data);
+ }
+
/**
* Get references to all modules (whether they have loaded and/or
* errored) or not.
void active_start(
DaemonStateIndex &ds, ClusterState &cs,
const std::map<std::string, std::string> &kv_store,
+ bool mon_provides_kv_sub,
MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
Objecter &objecter_, Client &client_, Finisher &f,
DaemonServer &server);