From: Sage Weil Date: Tue, 16 Feb 2021 19:47:23 +0000 (-0500) Subject: mgr: use new kv subscription for mgr/, device/, config/ X-Git-Tag: v17.1.0~2900^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=edf1ea2f50927b97a4271c6754f855421dd4d4d0;p=ceph.git mgr: use new kv subscription for mgr/, device/, config/ Include the config/ prefix (which we weren't loading before). Before we are active, we collect these changes, and then pass them to the ActivePyModules ctor. No change in functionality here, except that when we make a change from a mgr module, we'll (1) put it in our local cache store, (2) send the mon command to set it, and (3) get a notification that updates the same value. Since this whole process is synchronous (see ActivePyModules::set_store()), and the notification will generally arrive *before* the command ack, there is no change in behavior. If the mon cluster is not yet pacific, we still need to load kv values the old way. If it's a mixed-mon cluster (and, e.g., our current mon has the feature but not all of them do), we'll get this data both ways, but no harm is done. Signed-off-by: Sage Weil --- diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index dc10235e70a85..7c81c736a50c6 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -37,18 +37,20 @@ #undef dout_prefix #define dout_prefix *_dout << "mgr " << __func__ << " " -ActivePyModules::ActivePyModules(PyModuleConfig &module_config_, - std::map store_data, - DaemonStateIndex &ds, ClusterState &cs, - MonClient &mc, LogChannelRef clog_, - LogChannelRef audit_clog_, Objecter &objecter_, - Client &client_, Finisher &f, DaemonServer &server, - PyModuleRegistry &pmr) - : module_config(module_config_), daemon_state(ds), cluster_state(cs), - monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_), - client(client_), finisher(f), - cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"), - server(server), py_module_registry(pmr) +ActivePyModules::ActivePyModules( + PyModuleConfig &module_config_, + std::map store_data, + bool mon_provides_kv_sub, + DaemonStateIndex &ds, ClusterState &cs, + MonClient &mc, LogChannelRef clog_, + LogChannelRef audit_clog_, Objecter &objecter_, + Client &client_, Finisher &f, DaemonServer &server, + PyModuleRegistry &pmr) +: module_config(module_config_), daemon_state(ds), cluster_state(cs), + monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_), + client(client_), finisher(f), + cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"), + server(server), py_module_registry(pmr) { store_cache = std::move(store_data); cmd_finisher.start(); @@ -703,6 +705,9 @@ void ActivePyModules::set_store(const std::string &module_name, Command set_cmd; { std::lock_guard l(lock); + + // NOTE: this isn't strictly necessary since we'll also get an MKVData + // update from the mon due to our subscription *before* our command is acked. if (val) { store_cache[global_key] = *val; } else { @@ -756,6 +761,33 @@ std::map ActivePyModules::get_services() const return result; } +void ActivePyModules::update_kv_data( + const std::string prefix, + bool incremental, + const map, std::less<>>& data) +{ + std::lock_guard l(lock); + if (!incremental) { + dout(10) << "full update on " << prefix << dendl; + auto p = store_cache.lower_bound(prefix); + while (p != store_cache.end() && p->first.find(prefix) == 0) { + dout(20) << " rm prior " << p->first << dendl; + p = store_cache.erase(p); + } + } else { + dout(10) << "incremental update on " << prefix << dendl; + } + for (auto& i : data) { + if (i.second) { + dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl; + store_cache[i.first] = i.second->to_str(); + } else { + dout(20) << " rm " << i.first << dendl; + store_cache.erase(i.first); + } + } +} + PyObject* ActivePyModules::with_perf_counters( std::function fct, const std::string &svc_name, diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h index ad6f50219268b..b59c0dff36c92 100644 --- a/src/mgr/ActivePyModules.h +++ b/src/mgr/ActivePyModules.h @@ -44,6 +44,7 @@ class ActivePyModules // module class instances already created std::map> modules; PyModuleConfig &module_config; + bool have_local_config_map = false; std::map store_cache; DaemonStateIndex &daemon_state; ClusterState &cluster_state; @@ -63,11 +64,13 @@ private: mutable ceph::mutex lock = ceph::make_mutex("ActivePyModules::lock"); public: - ActivePyModules(PyModuleConfig &module_config, - std::map store_data, - DaemonStateIndex &ds, ClusterState &cs, MonClient &mc, - LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_, - Finisher &f, DaemonServer &server, PyModuleRegistry &pmr); + ActivePyModules( + PyModuleConfig &module_config, + std::map store_data, + bool mon_provides_kv_sub, + DaemonStateIndex &ds, ClusterState &cs, MonClient &mc, + LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_, + Finisher &f, DaemonServer &server, PyModuleRegistry &pmr); ~ActivePyModules(); @@ -162,6 +165,11 @@ public: std::map get_services() const; + void update_kv_data( + const std::string prefix, + bool incremental, + const map, std::less<>>& data); + // Public so that MonCommandCompletion can use it // FIXME: for send_command completion notifications, // send it to only the module that sent the command, not everyone diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index 4af58dab8f74c..26422e150e797 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -29,6 +29,7 @@ #include "messages/MCommandReply.h" #include "messages/MLog.h" #include "messages/MServiceMap.h" +#include "messages/MKVData.h" #include "PyModule.h" #include "Mgr.h" @@ -178,11 +179,11 @@ std::map Mgr::load_store() dout(20) << "saw key '" << key << "'" << dendl; - const std::string config_prefix = PyModule::config_prefix; + const std::string store_prefix = PyModule::config_prefix; const std::string device_prefix = "device/"; - if (key.substr(0, config_prefix.size()) == config_prefix || - key.substr(0, device_prefix.size()) == device_prefix) { + if (key.substr(0, device_prefix.size()) == device_prefix || + key.substr(0, store_prefix.size()) == store_prefix) { dout(20) << "fetching '" << key << "'" << dendl; Command get_cmd; std::ostringstream cmd_json; @@ -192,26 +193,7 @@ std::map Mgr::load_store() get_cmd.wait(); lock.lock(); if (get_cmd.r == 0) { // tolerate racing config-key change - if (key.substr(0, device_prefix.size()) == device_prefix) { - // device/ - string devid = key.substr(device_prefix.size()); - map meta; - ostringstream ss; - string val = get_cmd.outbl.to_str(); - int r = get_json_str_map(val, ss, &meta, false); - if (r < 0) { - derr << __func__ << " failed to parse " << val << ": " << ss.str() - << dendl; - } else { - daemon_state.with_device_create( - devid, [&meta] (DeviceState& dev) { - dev.set_metadata(std::move(meta)); - }); - } - } else { - // config/ - loaded[key] = get_cmd.outbl.to_str(); - } + loaded[key] = get_cmd.outbl.to_str(); } } } @@ -246,11 +228,26 @@ void Mgr::init() register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal); register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal); + // Only pacific+ monitors support subscribe to kv updates + bool mon_allows_kv_sub = false; + monc->with_monmap( + [&](const MonMap &monmap) { + if (monmap.get_required_features().contains_all( + ceph::features::mon::FEATURE_PACIFIC)) { + mon_allows_kv_sub = true; + } + }); + // subscribe to all the maps monc->sub_want("log-info", 0, 0); monc->sub_want("mgrdigest", 0, 0); monc->sub_want("fsmap", 0, 0); monc->sub_want("servicemap", 0, 0); + if (mon_allows_kv_sub) { + monc->sub_want("kv:config/", 0, 0); + monc->sub_want("kv:mgr/", 0, 0); + monc->sub_want("kv:device/", 0, 0); + } dout(4) << "waiting for OSDMap..." << dendl; // Subscribe to OSDMap update to pass on to ClusterState @@ -299,20 +296,45 @@ void Mgr::init() dout(4) << "waiting for FSMap..." << dendl; fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();}); - dout(4) << "waiting for config-keys..." << dendl; - // Wait for MgrDigest... dout(4) << "waiting for MgrDigest..." << dendl; digest_cond.wait(l, [this] { return digest_received; }); - // Load module KV store - auto kv_store = load_store(); + if (!mon_allows_kv_sub) { + dout(4) << "loading config-key data from pre-pacific mon cluster..." << dendl; + pre_init_store = load_store(); + } + dout(4) << "initializing device state..." << dendl; + // Note: we only have to do this during startup because once we are + // active the only changes to this state will originate from one of our + // own modules. + for (auto p = pre_init_store.lower_bound("device/"); + p != pre_init_store.end() && p->first.find("device/") == 0; + ++p) { + string devid = p->first.substr(7); + dout(10) << " updating " << devid << dendl; + map meta; + ostringstream ss; + int r = get_json_str_map(p->second, ss, &meta, false); + if (r < 0) { + derr << __func__ << " failed to parse " << p->second << ": " << ss.str() + << dendl; + } else { + daemon_state.with_device_create( + devid, [&meta] (DeviceState& dev) { + dev.set_metadata(std::move(meta)); + }); + } + } + // assume finisher already initialized in background_init dout(4) << "starting python modules..." << dendl; - py_module_registry->active_start(daemon_state, cluster_state, - kv_store, *monc, clog, audit_clog, *objecter, *client, - finisher, server); + py_module_registry->active_start( + daemon_state, cluster_state, + pre_init_store, mon_allows_kv_sub, + *monc, clog, audit_clog, *objecter, *client, + finisher, server); cluster_state.final_init(); @@ -567,6 +589,43 @@ bool Mgr::ms_dispatch2(const ref_t& m) case MSG_LOG: handle_log(ref_cast(m)); break; + case MSG_KV_DATA: + { + auto msg = ref_cast(m); + monc->sub_got("kv:"s + msg->prefix, msg->version); + if (!msg->data.empty()) { + if (initialized) { + py_module_registry->update_kv_data( + msg->prefix, + msg->incremental, + msg->data + ); + } else { + // before we have created the ActivePyModules, we need to + // track the store regions we're monitoring + if (!msg->incremental) { + dout(10) << "full update on " << msg->prefix << dendl; + auto p = pre_init_store.lower_bound(msg->prefix); + while (p != pre_init_store.end() && p->first.find(msg->prefix) == 0) { + dout(20) << " rm prior " << p->first << dendl; + p = pre_init_store.erase(p); + } + } else { + dout(10) << "incremental update on " << msg->prefix << dendl; + } + for (auto& i : msg->data) { + if (i.second) { + dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl; + pre_init_store[i.first] = i.second->to_str(); + } else { + dout(20) << " rm " << i.first << dendl; + pre_init_store.erase(i.first); + } + } + } + } + } + break; default: return false; diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h index 157105d788e7c..28a7da93de083 100644 --- a/src/mgr/Mgr.h +++ b/src/mgr/Mgr.h @@ -61,6 +61,8 @@ protected: LogChannelRef clog; LogChannelRef audit_clog; + std::map pre_init_store; + void load_all_metadata(); std::map load_store(); void init(); diff --git a/src/mgr/PyModuleRegistry.cc b/src/mgr/PyModuleRegistry.cc index 221b93beed7b4..1ae44143c7d80 100644 --- a/src/mgr/PyModuleRegistry.cc +++ b/src/mgr/PyModuleRegistry.cc @@ -183,6 +183,7 @@ void PyModuleRegistry::standby_start(MonClient &mc, Finisher &f) void PyModuleRegistry::active_start( DaemonStateIndex &ds, ClusterState &cs, const std::map &kv_store, + bool mon_provides_kv_sub, MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_, Finisher &f, DaemonServer &server) @@ -202,10 +203,13 @@ void PyModuleRegistry::active_start( standby_modules.reset(); } - active_modules.reset(new ActivePyModules( - module_config, kv_store, ds, cs, mc, - clog_, audit_clog_, objecter_, client_, f, server, - *this)); + active_modules.reset( + new ActivePyModules( + module_config, + kv_store, mon_provides_kv_sub, + ds, cs, mc, + clog_, audit_clog_, objecter_, client_, f, server, + *this)); for (const auto &i : modules) { // Anything we're skipping because of !can_run will be flagged diff --git a/src/mgr/PyModuleRegistry.h b/src/mgr/PyModuleRegistry.h index c16c4bf4c8ea6..a5f38e8f65a27 100644 --- a/src/mgr/PyModuleRegistry.h +++ b/src/mgr/PyModuleRegistry.h @@ -66,6 +66,14 @@ public: void handle_config(const std::string &k, const std::string &v); void handle_config_notify(); + void update_kv_data( + const std::string prefix, + bool incremental, + const map, std::less<>>& data) { + ceph_assert(active_modules); + active_modules->update_kv_data(prefix, incremental, data); + } + /** * Get references to all modules (whether they have loaded and/or * errored) or not. @@ -99,6 +107,7 @@ public: void active_start( DaemonStateIndex &ds, ClusterState &cs, const std::map &kv_store, + bool mon_provides_kv_sub, MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_, Finisher &f, DaemonServer &server);