assert(r >= 0);
}
-int md_config_t::set_mon_vals(CephContext *cct, const map<string,string>& kv)
+int md_config_t::set_mon_vals(CephContext *cct,
+ const map<string,string>& kv,
+ config_callback config_cb)
{
Mutex::Locker l(lock);
ignored_mon_values.clear();
+
+ if (!config_cb) {
+ ldout(cct, 4) << __func__ << " no callback set" << dendl;
+ }
+
for (auto& i : kv) {
+ if (config_cb && config_cb(i.first, i.second)) {
+ ldout(cct, 4) << __func__ << " callback consumed " << i.first << dendl;
+ continue;
+ } else {
+ ldout(cct, 4) << __func__ << " callback ignored " << i.first << dendl;
+ }
const Option *o = find_option(i.first);
if (!o) {
ldout(cct,10) << __func__ << " " << i.first << " = " << i.second
bool md_config_t::*,
entity_addr_t md_config_t::*,
uuid_d md_config_t::*> member_ptr_t;
+
+ // For use when intercepting configuration updates
+ typedef std::function<bool(
+ const std::string &k, const std::string &v)> config_callback;
+
/// true if we are a daemon (as per CephContext::code_env)
const bool is_daemon;
void set_val_default(const std::string& key, const std::string &val);
/// Set a values from mon
- int set_mon_vals(CephContext *cct, const map<std::string,std::string>& kv);
+ int set_mon_vals(CephContext *cct,
+ const map<std::string,std::string>& kv,
+ config_callback config_cb);
// Called by the Ceph daemons to make configuration changes at runtime
int injectargs(const std::string &s, std::ostream *oss);
// For ::config_prefix
#include "PyModuleRegistry.h"
+#include "PyModule.h"
#include "ActivePyModules.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mgr
#undef dout_prefix
-#define dout_prefix *_dout << "mgr " << __func__ << " "
+#define dout_prefix *_dout << "mgr " << __func__ << " "
-
-ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
+ActivePyModules::ActivePyModules(PyModuleConfig &module_config,
DaemonStateIndex &ds, ClusterState &cs,
MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
Client &client_, Finisher &f)
- : config_cache(config_), daemon_state(ds), cluster_state(cs),
+ : config_cache(module_config), daemon_state(ds), cluster_state(cs),
monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
lock("ActivePyModules")
-{}
+{
+ for(auto &i: config_cache.config) {
+ auto last_slash = i.first.rfind('/');
+ const std::string module_name = i.first.substr(4, i.first.substr(4).find('/'));
+ const std::string key = i.first.substr(last_slash + 1);
+ set_config(module_name, key, i.second);
+ }
+}
ActivePyModules::~ActivePyModules() = default;
}
}
+bool ActivePyModules::get_store(const std::string &module_name,
+ const std::string &key, std::string *val) const
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ const std::string global_key = PyModuleRegistry::config_prefix
+ + module_name + "/" + key;
+
+ dout(4) << __func__ << "key: " << global_key << dendl;
+
+
+ Mutex::Locker lock(config_cache.lock);
+
+ if (config_cache.config.count(global_key)) {
+ *val = config_cache.config.at(global_key);
+ return true;
+ } else {
+ return false;
+ }
+}
+
bool ActivePyModules::get_config(const std::string &module_name,
const std::string &key, std::string *val) const
{
dout(4) << __func__ << " key: " << global_key << dendl;
- if (config_cache.count(global_key)) {
- *val = config_cache.at(global_key);
+
+ Mutex::Locker lock(config_cache.lock);
+
+ if (config_cache.config.count(global_key)) {
+ *val = config_cache.config.at(global_key);
return true;
} else {
return false;
dout(4) << __func__ << " prefix: " << global_prefix << dendl;
PyFormatter f;
- for (auto p = config_cache.lower_bound(global_prefix);
- p != config_cache.end() && p->first.find(global_prefix) == 0;
+
+ Mutex::Locker lock(config_cache.lock);
+
+ for (auto p = config_cache.config.lower_bound(global_prefix);
+ p != config_cache.config.end() && p->first.find(global_prefix) == 0;
++p) {
f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
}
return f.get();
}
-void ActivePyModules::set_config(const std::string &module_name,
+void ActivePyModules::set_store(const std::string &module_name,
const std::string &key, const boost::optional<std::string>& val)
{
const std::string global_key = PyModuleRegistry::config_prefix
+ module_name + "/" + key;
-
+
Command set_cmd;
{
PyThreadState *tstate = PyEval_SaveThread();
Mutex::Locker l(lock);
PyEval_RestoreThread(tstate);
+
+ Mutex::Locker lock(config_cache.lock);
+
if (val) {
- config_cache[global_key] = *val;
+ config_cache.config[global_key] = *val;
} else {
- config_cache.erase(global_key);
+ config_cache.config.erase(global_key);
}
std::ostringstream cmd_json;
}
}
+void ActivePyModules::set_config(const std::string &module_name,
+ const std::string &key, const boost::optional<std::string>& val)
+{
+ const std::string global_key = PyModuleRegistry::config_prefix
+ + module_name + "/" + key;
+
+ Command set_cmd;
+ {
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ Mutex::Locker lock(config_cache.lock);
+
+ if (val) {
+ config_cache.config[global_key] = *val;
+ } else {
+ config_cache.config.erase(global_key);
+ }
+
+ std::ostringstream cmd_json;
+ JSONFormatter jf;
+ jf.open_object_section("cmd");
+ if (val) {
+ jf.dump_string("prefix", "config set mgr");
+ jf.dump_string("key", global_key);
+ jf.dump_string("val", *val);
+ } else {
+ jf.dump_string("prefix", "config rm");
+ jf.dump_string("key", global_key);
+ }
+ jf.close_section();
+ jf.flush(cmd_json);
+ set_cmd.run(&monc, cmd_json.str());
+ }
+ set_cmd.wait();
+
+ if (set_cmd.r != 0) {
+ dout(0) << "`config set mgr" << global_key << " " << val << "` failed: "
+ << cpp_strerror(set_cmd.r) << dendl;
+ dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
+ }
+}
+
std::map<std::string, std::string> ActivePyModules::get_services() const
{
std::map<std::string, std::string> result;
class health_check_map_t;
-typedef std::map<std::string, std::string> PyModuleConfig;
-
class ActivePyModules
{
mutable Mutex lock{"ActivePyModules::lock"};
public:
- ActivePyModules(PyModuleConfig const &config_,
+ ActivePyModules(PyModuleConfig &module_config,
DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
LogChannelRef clog_, Objecter &objecter_, Client &client_,
Finisher &f);
MonClient &get_monc() {return monc;}
Objecter &get_objecter() {return objecter;}
Client &get_client() {return client;}
-
PyObject *get_python(const std::string &what);
PyObject *get_server_python(const std::string &hostname);
PyObject *list_servers_python();
PyObject *get_context();
PyObject *get_osdmap();
- bool get_config(const std::string &module_name,
+ bool get_store(const std::string &module_name,
const std::string &key, std::string *val) const;
PyObject *get_config_prefix(const std::string &module_name,
const std::string &prefix) const;
+ void set_store(const std::string &module_name,
+ const std::string &key, const boost::optional<std::string> &val);
+
+ bool get_config(const std::string &module_name,
+ const std::string &key, std::string *val) const;
void set_config(const std::string &module_name,
const std::string &key, const boost::optional<std::string> &val);
#include "messages/MCommandReply.h"
#include "messages/MLog.h"
#include "messages/MServiceMap.h"
-
+#include "PyModule.h"
#include "Mgr.h"
#define dout_context g_ceph_context
}
}
-std::map<std::string, std::string> Mgr::load_config()
+PyModuleConfig Mgr::load_config()
{
assert(lock.is_locked_by_me());
lock.Lock();
assert(cmd.r == 0);
- std::map<std::string, std::string> loaded;
+ PyModuleConfig loaded;
for (auto &key_str : cmd.json_result.get_array()) {
std::string const key = key_str.get_str();
+
dout(20) << "saw key '" << key << "'" << dendl;
const std::string config_prefix = PyModuleRegistry::config_prefix;
get_cmd.wait();
lock.Lock();
assert(get_cmd.r == 0);
- loaded[key] = get_cmd.outbl.to_str();
+ loaded.config[key] = get_cmd.outbl.to_str();
}
}
monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
|CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
monc.set_messenger(client_messenger.get());
+
+ // We must register our config callback before calling init(), so
+ // that we see the initial configuration message
+ monc.register_config_callback([this](const std::string &k, const std::string &v){
+ dout(10) << "config_callback: " << k << " : " << v << dendl;
+ if (k.substr(0, 4) == "mgr/") {
+ const std::string global_key = PyModuleRegistry::config_prefix + k.substr(4);
+ py_module_registry.handle_config(global_key, v);
+
+ return true;
+ }
+ return false;
+ });
+ dout(4) << "Registered monc callback" << dendl;
+
int r = monc.init();
if (r < 0) {
monc.shutdown();
class MMgrMap;
class Mgr;
+class PyModuleConfig;
class MgrStandby : public Dispatcher,
public md_config_obs_t {
#endif
}
+PyModuleConfig::PyModuleConfig() = default;
+
+PyModuleConfig::PyModuleConfig(PyModuleConfig &mconfig)
+ : config(mconfig.config)
+{}
+
+PyModuleConfig::~PyModuleConfig() = default;
std::string PyModule::get_site_packages()
{
typedef std::shared_ptr<PyModule> PyModuleRef;
+class PyModuleConfig {
+public:
+ mutable Mutex lock{"PyModuleConfig::lock"};
+ std::map<std::string, std::string> config;
+
+ PyModuleConfig();
+
+ PyModuleConfig(PyModuleConfig &mconfig);
+
+ ~PyModuleConfig();
+};
dout(4) << "Starting modules in standby mode" << dendl;
- standby_modules.reset(new StandbyPyModules(monc, mgr_map, clog));
+ standby_modules.reset(new StandbyPyModules(
+ monc, mgr_map, module_config, clog));
std::set<std::string> failed_modules;
for (const auto &i : modules) {
}
void PyModuleRegistry::active_start(
- PyModuleConfig &config_,
+ PyModuleConfig &module_config,
DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
LogChannelRef clog_, Objecter &objecter_, Client &client_,
Finisher &f)
}
active_modules.reset(new ActivePyModules(
- config_, ds, cs, mc, clog_, objecter_, client_, f));
+ module_config, ds, cs, mc, clog_, objecter_, client_, f));
for (const auto &i : modules) {
// Anything we're skipping because of !can_run will be flagged
}
}
+void PyModuleRegistry::handle_config(const std::string &k, const std::string &v)
+{
+ Mutex::Locker l(module_config.lock);
+
+ if (!v.empty()) {
+ module_config.config[k] = v;
+ } else {
+ module_config.config.erase(k);
+ }
+}
+
#include "ActivePyModules.h"
#include "StandbyPyModules.h"
-
-
/**
* This class is responsible for setting up the python runtime environment
* and importing the python modules.
{
private:
mutable Mutex lock{"PyModuleRegistry::lock"};
-
LogChannelRef clog;
std::map<std::string, PyModuleRef> modules;
*/
std::set<std::string> probe_modules() const;
+ PyModuleConfig module_config;
+
public:
static std::string config_prefix;
+ void handle_config(const std::string &k, const std::string &v);
+
/**
* Get references to all modules (whether they have loaded and/or
* errored) or not.
void init();
void active_start(
- PyModuleConfig &config_,
+ PyModuleConfig &module_config,
DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
LogChannelRef clog_, Objecter &objecter_, Client &client_,
Finisher &f);
#define dout_prefix *_dout << "mgr " << __func__ << " "
-StandbyPyModules::StandbyPyModules(MonClient *monc_, const MgrMap &mgr_map_,
- LogChannelRef clog_)
- : monc(monc_), load_config_thread(monc, &state), clog(clog_)
+StandbyPyModules::StandbyPyModules(
+ MonClient *monc_, const MgrMap &mgr_map_,
+ PyModuleConfig &module_config, LogChannelRef clog_)
+ : monc(monc_),
+ state(module_config),
+ clog(clog_)
{
state.set_mgr_map(mgr_map_);
}
{
Mutex::Locker locker(lock);
- if (!state.is_config_loaded && load_config_thread.is_started()) {
- // FIXME: handle cases where initial load races with shutdown
- // this is actually not super rare because
- assert(0);
- //load_config_thread.kill(SIGKILL);
- }
-
// Signal modules to drop out of serve() and/or tear down resources
for (auto &i : modules) {
auto module = i.second.get();
state,
py_module, clog));
- if (modules.size() == 1) {
- load_config_thread.create("LoadConfig");
- }
-
int r = modules[module_name]->load();
if (r != 0) {
modules.erase(module_name);
}
}
-void *StandbyPyModules::LoadConfigThread::entry()
-{
- dout(10) << "listing keys" << dendl;
- JSONCommand cmd;
- cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
- cmd.wait();
- assert(cmd.r == 0);
-
- std::map<std::string, std::string> loaded;
-
- for (auto &key_str : cmd.json_result.get_array()) {
- std::string const key = key_str.get_str();
- dout(20) << "saw key '" << key << "'" << dendl;
-
- const std::string config_prefix = PyModuleRegistry::config_prefix;
-
- if (key.substr(0, config_prefix.size()) == config_prefix) {
- dout(20) << "fetching '" << key << "'" << dendl;
- Command get_cmd;
- std::ostringstream cmd_json;
- cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
- get_cmd.run(monc, cmd_json.str());
- get_cmd.wait();
- assert(get_cmd.r == 0);
- loaded[key] = get_cmd.outbl.to_str();
- }
- }
- state->loaded_config(loaded);
-
- return nullptr;
-}
-
bool StandbyPyModule::get_config(const std::string &key,
std::string *value) const
{
+ get_name() + "/" + key;
dout(4) << __func__ << " key: " << global_key << dendl;
-
+
return state.with_config([global_key, value](const PyModuleConfig &config){
- if (config.count(global_key)) {
- *value = config.at(global_key);
+ if (config.config.count(global_key)) {
+ *value = config.config.at(global_key);
return true;
} else {
return false;
#include "mon/MgrMap.h"
#include "mgr/PyModuleRunner.h"
-typedef std::map<std::string, std::string> PyModuleConfig;
+//typedef std::map<std::string, std::string> PyModuleConfig;
/**
* State that is read by all modules running in standby mode
mutable Mutex lock{"StandbyPyModuleState::lock"};
MgrMap mgr_map;
- PyModuleConfig config_cache;
+ //PyModuleConfig config_cache;
- mutable Cond config_loaded;
+
public:
- bool is_config_loaded = false;
+ PyModuleConfig &module_config;
+
+ StandbyPyModuleState(PyModuleConfig &module_config_)
+ : module_config(module_config_)
+ {}
void set_mgr_map(const MgrMap &mgr_map_)
{
mgr_map = mgr_map_;
}
- void loaded_config(const PyModuleConfig &config_)
- {
- Mutex::Locker l(lock);
-
- config_cache = config_;
- is_config_loaded = true;
- config_loaded.Signal();
- }
-
template<typename Callback, typename...Args>
void with_mgr_map(Callback&& cb, Args&&...args) const
{
template<typename Callback, typename...Args>
auto with_config(Callback&& cb, Args&&... args) const ->
- decltype(cb(config_cache, std::forward<Args>(args)...)) {
+ decltype(cb(module_config, std::forward<Args>(args)...)) {
Mutex::Locker l(lock);
- if (!is_config_loaded) {
- config_loaded.Wait(lock);
- }
-
- return std::forward<Callback>(cb)(config_cache, std::forward<Args>(args)...);
+ return std::forward<Callback>(cb)(module_config, std::forward<Args>(args)...);
}
};
StandbyPyModuleState state;
- void load_config();
- class LoadConfigThread : public Thread
- {
- protected:
- MonClient *monc;
- StandbyPyModuleState *state;
- public:
- LoadConfigThread(MonClient *monc_, StandbyPyModuleState *state_)
- : monc(monc_), state(state_)
- {}
- void *entry() override;
- };
-
- LoadConfigThread load_config_thread;
-
LogChannelRef clog;
public:
StandbyPyModules(
MonClient *monc_,
const MgrMap &mgr_map_,
+ PyModuleConfig &module_config,
LogChannelRef clog_);
int start_one(PyModuleRef py_module);
cmd_getval(g_ceph_context, cmdmap, "value", value);
if (prefix == "config set") {
- const Option *opt = g_conf->find_option(name);
- if (!opt) {
- ss << "unrecognized config option '" << name << "'";
- err = -EINVAL;
- goto reply;
- }
-
- Option::value_t real_value;
- string errstr;
- err = opt->parse_value(value, &real_value, &errstr, &value);
- if (err < 0) {
- ss << "error parsing value: " << errstr;
- goto reply;
+ if (name.substr(0, 4) != "mgr/") {
+ const Option *opt = g_conf->find_option(name);
+ if (!opt) {
+ ss << "unrecognized config option '" << name << "'";
+ err = -EINVAL;
+ goto reply;
+ }
+
+ Option::value_t real_value;
+ string errstr;
+ err = opt->parse_value(value, &real_value, &errstr, &value);
+ if (err < 0) {
+ ss << "error parsing value: " << errstr;
+ goto reply;
+ }
}
}
who = key.substr(0, last_slash);
}
- const Option *opt = g_conf->find_option(name);
- if (!opt) {
- dout(10) << __func__ << " unrecognized option '" << name << "'" << dendl;
- opt = new Option(name, Option::TYPE_STR, Option::LEVEL_UNKNOWN);
- }
- string err;
- int r = opt->pre_validate(&value, &err);
- if (r < 0) {
- dout(10) << __func__ << " pre-validate failed on '" << name << "' = '"
- << value << "' for " << name << dendl;
- }
-
string section_name;
- MaskedOption mopt(opt);
- mopt.raw_value = value;
- if (who.size() &&
- !ConfigMap::parse_mask(who, §ion_name, &mopt.mask)) {
- derr << __func__ << " ignoring key " << key << dendl;
- } else {
+ if (key.find("mgr") == 0) {
+ name = key.substr(key.find('/') + 1);
+ MaskedOption mopt(new Option(name, Option::TYPE_STR, Option::LEVEL_UNKNOWN));
+ mopt.raw_value = value;
Section *section = &config_map.global;;
+ section_name = "mgr";
if (section_name.size()) {
if (section_name.find('.') != std::string::npos) {
section = &config_map.by_id[section_name];
}
}
section->options.insert(make_pair(name, std::move(mopt)));
- ++num;
+ ++num;
+ } else {
+ const Option *opt = g_conf->find_option(name);
+ if (!opt) {
+ dout(10) << __func__ << " unrecognized option '" << name << "'" << dendl;
+ opt = new Option(name, Option::TYPE_STR, Option::LEVEL_UNKNOWN);
+ }
+ string err;
+ int r = opt->pre_validate(&value, &err);
+ if (r < 0) {
+ dout(10) << __func__ << " pre-validate failed on '" << name << "' = '"
+ << value << "' for " << name << dendl;
+ }
+
+ MaskedOption mopt(opt);
+ mopt.raw_value = value;
+ if (who.size() &&
+ !ConfigMap::parse_mask(who, §ion_name, &mopt.mask)) {
+ derr << __func__ << " ignoring key " << key << dendl;
+ } else {
+ Section *section = &config_map.global;;
+ if (section_name.size()) {
+ if (section_name.find('.') != std::string::npos) {
+ section = &config_map.by_id[section_name];
+ } else {
+ section = &config_map.by_type[section_name];
+ }
+ }
+ section->options.insert(make_pair(name, std::move(mopt)));
+ ++num;
+ }
}
it->next();
}
osdmap.crush.get(),
string(), // no device class
&out);
- g_conf->set_mon_vals(g_ceph_context, out);
+ g_conf->set_mon_vals(g_ceph_context, out, nullptr);
}
}
bool ConfigMonitor::refresh_config(MonSession *s)
{
const OSDMap& osdmap = mon->osdmon()->osdmap;
-
map<string,string> crush_location;
if (s->remote_host.size()) {
osdmap.crush->get_full_location(s->remote_host, &crush_location);
cct_->_conf->get_val<double>("mon_client_hunt_interval_min_multiple")),
last_mon_command_tid(0),
version_req_id(0)
-{
-}
+{}
MonClient::~MonClient()
{
decode(monmap, p);
ldout(cct, 10) << " got monmap " << monmap.epoch
- << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
- << dendl;
+ << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
+ << dendl;
ldout(cct, 10) << "dump:\n";
monmap.print(*_dout);
*_dout << dendl;
void MonClient::handle_config(MConfig *m)
{
ldout(cct,10) << __func__ << " " << *m << dendl;
- cct->_conf->set_mon_vals(cct, m->config);
+ cct->_conf->set_mon_vals(cct, m->config, config_cb);
m->put();
got_config = true;
map_cond.Signal();
ldout(cct, 5) << "already authenticated" << dendl;
return 0;
}
-
_sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
_sub_want("config", 0, 0);
if (!_opened())
}
return ret;
}
+
+void MonClient::register_config_callback(md_config_t::config_callback fn) {
+ assert(!config_cb);
+ config_cb = fn;
+}
+
+md_config_t::config_callback MonClient::get_config_callback() {
+ return config_cb;
+}
* Foundation. See file COPYING.
*
*/
-
#ifndef CEPH_MONCLIENT_H
#define CEPH_MONCLIENT_H
#include "common/Finisher.h"
#include "common/config.h"
-
class MMonMap;
class MConfig;
class MMonGetVersionReply;
class MonClient : public Dispatcher {
public:
MonMap monmap;
+ map<string,string> config_mgr;
private:
Messenger *messenger;
std::unique_ptr<Context> session_established_context;
bool had_a_connection;
double reopen_interval_multiplier;
-
+
bool _opened() const;
bool _hunting() const;
void _start_hunting();
// admin commands
private:
uint64_t last_mon_command_tid;
+
struct MonCommand {
string target_name;
int target_rank;
* @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
*/
void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
-
/**
* Run a callback within our lock, with a reference
* to the MonMap
return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
}
+ void register_config_callback(md_config_t::config_callback fn);
+ md_config_t::config_callback get_config_callback();
+
private:
struct version_req_d {
Context *context;
ceph_tid_t version_req_id;
void handle_get_version_reply(MMonGetVersionReply* m);
-
+ md_config_t::config_callback config_cb;
};
#endif