]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: use new kv subscription for mgr/, device/, config/
authorSage Weil <sage@newdream.net>
Tue, 16 Feb 2021 19:47:23 +0000 (14:47 -0500)
committerSage Weil <sage@newdream.net>
Wed, 17 Feb 2021 17:27:58 +0000 (12:27 -0500)
Include the config/ prefix (which we weren't loading before).

Before we are active, we collect these changes, and then pass them to
the ActivePyModules ctor.  No change in functionality here, except that
when we make a change from a mgr module, we'll (1) put it in our local
cache store, (2) send the mon command to set it, and (3) get a notification
that updates the same value.  Since this whole process is synchronous (see
ActivePyModules::set_store()), and the notification will generally arrive
*before* the command ack, there is no change in behavior.

If the mon cluster is not yet pacific, we still need to load
kv values the old way.  If it's a mixed-mon cluster (and, e.g., our
current mon has the feature but not all of them do), we'll get this data
both ways, but no harm is done.

Signed-off-by: Sage Weil <sage@newdream.net>
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/Mgr.cc
src/mgr/Mgr.h
src/mgr/PyModuleRegistry.cc
src/mgr/PyModuleRegistry.h

index dc10235e70a8506cdab073310aeea06b753733c3..7c81c736a50c6faf4596d8d7050f5054252e6ffc 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 
-ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
-          std::map<std::string, std::string> store_data,
-          DaemonStateIndex &ds, ClusterState &cs,
-          MonClient &mc, LogChannelRef clog_,
-          LogChannelRef audit_clog_, Objecter &objecter_,
-          Client &client_, Finisher &f, DaemonServer &server,
-          PyModuleRegistry &pmr)
-  : module_config(module_config_), daemon_state(ds), cluster_state(cs),
-    monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
-    client(client_), finisher(f),
-    cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
-    server(server), py_module_registry(pmr)
+ActivePyModules::ActivePyModules(
+  PyModuleConfig &module_config_,
+  std::map<std::string, std::string> store_data,
+  bool mon_provides_kv_sub,
+  DaemonStateIndex &ds, ClusterState &cs,
+  MonClient &mc, LogChannelRef clog_,
+  LogChannelRef audit_clog_, Objecter &objecter_,
+  Client &client_, Finisher &f, DaemonServer &server,
+  PyModuleRegistry &pmr)
+: module_config(module_config_), daemon_state(ds), cluster_state(cs),
+  monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
+  client(client_), finisher(f),
+  cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
+  server(server), py_module_registry(pmr)
 {
   store_cache = std::move(store_data);
   cmd_finisher.start();
@@ -703,6 +705,9 @@ void ActivePyModules::set_store(const std::string &module_name,
   Command set_cmd;
   {
     std::lock_guard l(lock);
+
+    // NOTE: this isn't strictly necessary since we'll also get an MKVData
+    // update from the mon due to our subscription *before* our command is acked.
     if (val) {
       store_cache[global_key] = *val;
     } else {
@@ -756,6 +761,33 @@ std::map<std::string, std::string> ActivePyModules::get_services() const
   return result;
 }
 
+void ActivePyModules::update_kv_data(
+  const std::string prefix,
+  bool incremental,
+  const map<std::string, boost::optional<bufferlist>, std::less<>>& data)
+{
+  std::lock_guard l(lock);
+  if (!incremental) {
+    dout(10) << "full update on " << prefix << dendl;
+    auto p = store_cache.lower_bound(prefix);
+    while (p != store_cache.end() && p->first.find(prefix) == 0) {
+      dout(20) << " rm prior " << p->first << dendl;
+      p = store_cache.erase(p);
+    }
+  } else {
+    dout(10) << "incremental update on " << prefix << dendl;
+  }
+  for (auto& i : data) {
+    if (i.second) {
+      dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+      store_cache[i.first] = i.second->to_str();
+    } else {
+      dout(20) << " rm " << i.first << dendl;
+      store_cache.erase(i.first);
+    }
+  }
+}
+
 PyObject* ActivePyModules::with_perf_counters(
     std::function<void(PerfCounterInstance& counter_instance, PerfCounterType& counter_type, PyFormatter& f)> fct,
     const std::string &svc_name,
index ad6f50219268b76b6152697bf8aea9c3194d880e..b59c0dff36c921d0501294fda02cc8f8737242df 100644 (file)
@@ -44,6 +44,7 @@ class ActivePyModules
   // module class instances already created
   std::map<std::string, std::shared_ptr<ActivePyModule>> modules;
   PyModuleConfig &module_config;
+  bool have_local_config_map = false;
   std::map<std::string, std::string> store_cache;
   DaemonStateIndex &daemon_state;
   ClusterState &cluster_state;
@@ -63,11 +64,13 @@ private:
   mutable ceph::mutex lock = ceph::make_mutex("ActivePyModules::lock");
 
 public:
-  ActivePyModules(PyModuleConfig &module_config,
-            std::map<std::string, std::string> store_data,
-            DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
-            LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
-            Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
+  ActivePyModules(
+    PyModuleConfig &module_config,
+    std::map<std::string, std::string> store_data,
+    bool mon_provides_kv_sub,
+    DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
+    LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
+    Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
 
   ~ActivePyModules();
 
@@ -162,6 +165,11 @@ public:
 
   std::map<std::string, std::string> get_services() const;
 
+  void update_kv_data(
+    const std::string prefix,
+    bool incremental,
+    const map<std::string, boost::optional<bufferlist>, std::less<>>& data);
+
   // Public so that MonCommandCompletion can use it
   // FIXME: for send_command completion notifications,
   // send it to only the module that sent the command, not everyone
index 4af58dab8f74cbb5a0ca15c210270a0aca31565c..26422e150e7979b54b2baf4d9bf1e067ef6fcda6 100644 (file)
@@ -29,6 +29,7 @@
 #include "messages/MCommandReply.h"
 #include "messages/MLog.h"
 #include "messages/MServiceMap.h"
+#include "messages/MKVData.h"
 #include "PyModule.h"
 #include "Mgr.h"
 
@@ -178,11 +179,11 @@ std::map<std::string, std::string> Mgr::load_store()
     
     dout(20) << "saw key '" << key << "'" << dendl;
 
-    const std::string config_prefix = PyModule::config_prefix;
+    const std::string store_prefix = PyModule::config_prefix;
     const std::string device_prefix = "device/";
 
-    if (key.substr(0, config_prefix.size()) == config_prefix ||
-       key.substr(0, device_prefix.size()) == device_prefix) {
+    if (key.substr(0, device_prefix.size()) == device_prefix ||
+       key.substr(0, store_prefix.size()) == store_prefix) {
       dout(20) << "fetching '" << key << "'" << dendl;
       Command get_cmd;
       std::ostringstream cmd_json;
@@ -192,26 +193,7 @@ std::map<std::string, std::string> Mgr::load_store()
       get_cmd.wait();
       lock.lock();
       if (get_cmd.r == 0) { // tolerate racing config-key change
-       if (key.substr(0, device_prefix.size()) == device_prefix) {
-         // device/
-         string devid = key.substr(device_prefix.size());
-         map<string,string> meta;
-         ostringstream ss;
-         string val = get_cmd.outbl.to_str();
-         int r = get_json_str_map(val, ss, &meta, false);
-         if (r < 0) {
-           derr << __func__ << " failed to parse " << val << ": " << ss.str()
-                << dendl;
-         } else {
-           daemon_state.with_device_create(
-             devid, [&meta] (DeviceState& dev) {
-               dev.set_metadata(std::move(meta));
-             });
-         }
-       } else {
-         // config/
-         loaded[key] = get_cmd.outbl.to_str();
-       }
+       loaded[key] = get_cmd.outbl.to_str();
       }
     }
   }
@@ -246,11 +228,26 @@ void Mgr::init()
   register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
   register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
 
+  // Only pacific+ monitors support subscribe to kv updates
+  bool mon_allows_kv_sub = false;
+  monc->with_monmap(
+    [&](const MonMap &monmap) {
+      if (monmap.get_required_features().contains_all(
+           ceph::features::mon::FEATURE_PACIFIC)) {
+       mon_allows_kv_sub = true;
+      }
+    });
+
   // subscribe to all the maps
   monc->sub_want("log-info", 0, 0);
   monc->sub_want("mgrdigest", 0, 0);
   monc->sub_want("fsmap", 0, 0);
   monc->sub_want("servicemap", 0, 0);
+  if (mon_allows_kv_sub) {
+    monc->sub_want("kv:config/", 0, 0);
+    monc->sub_want("kv:mgr/", 0, 0);
+    monc->sub_want("kv:device/", 0, 0);
+  }
 
   dout(4) << "waiting for OSDMap..." << dendl;
   // Subscribe to OSDMap update to pass on to ClusterState
@@ -299,20 +296,45 @@ void Mgr::init()
   dout(4) << "waiting for FSMap..." << dendl;
   fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();});
 
-  dout(4) << "waiting for config-keys..." << dendl;
-
   // Wait for MgrDigest...
   dout(4) << "waiting for MgrDigest..." << dendl;
   digest_cond.wait(l, [this] { return digest_received; });
 
-  // Load module KV store
-  auto kv_store = load_store();
+  if (!mon_allows_kv_sub) {
+    dout(4) << "loading config-key data from pre-pacific mon cluster..." << dendl;
+    pre_init_store = load_store();
+  }
 
+  dout(4) << "initializing device state..." << dendl;
+  // Note: we only have to do this during startup because once we are
+  // active the only changes to this state will originate from one of our
+  // own modules.
+  for (auto p = pre_init_store.lower_bound("device/");
+       p != pre_init_store.end() && p->first.find("device/") == 0;
+       ++p) {
+    string devid = p->first.substr(7);
+    dout(10) << "  updating " << devid << dendl;
+    map<string,string> meta;
+    ostringstream ss;
+    int r = get_json_str_map(p->second, ss, &meta, false);
+    if (r < 0) {
+      derr << __func__ << " failed to parse " << p->second << ": " << ss.str()
+          << dendl;
+    } else {
+      daemon_state.with_device_create(
+       devid, [&meta] (DeviceState& dev) {
+                dev.set_metadata(std::move(meta));
+              });
+    }
+  }
+  
   // assume finisher already initialized in background_init
   dout(4) << "starting python modules..." << dendl;
-  py_module_registry->active_start(daemon_state, cluster_state,
-      kv_store, *monc, clog, audit_clog, *objecter, *client,
-      finisher, server);
+  py_module_registry->active_start(
+    daemon_state, cluster_state,
+    pre_init_store, mon_allows_kv_sub,
+    *monc, clog, audit_clog, *objecter, *client,
+    finisher, server);
 
   cluster_state.final_init();
 
@@ -567,6 +589,43 @@ bool Mgr::ms_dispatch2(const ref_t<Message>& m)
     case MSG_LOG:
       handle_log(ref_cast<MLog>(m));
       break;
+    case MSG_KV_DATA:
+      {
+       auto msg = ref_cast<MKVData>(m);
+       monc->sub_got("kv:"s + msg->prefix, msg->version);
+       if (!msg->data.empty()) {
+         if (initialized) {
+           py_module_registry->update_kv_data(
+             msg->prefix,
+             msg->incremental,
+             msg->data
+             );
+         } else {
+           // before we have created the ActivePyModules, we need to
+           // track the store regions we're monitoring
+           if (!msg->incremental) {
+             dout(10) << "full update on " << msg->prefix << dendl;
+             auto p = pre_init_store.lower_bound(msg->prefix);
+             while (p != pre_init_store.end() && p->first.find(msg->prefix) == 0) {
+               dout(20) << " rm prior " << p->first << dendl;
+               p = pre_init_store.erase(p);
+             }
+           } else {
+             dout(10) << "incremental update on " << msg->prefix << dendl;
+           }
+           for (auto& i : msg->data) {
+             if (i.second) {
+               dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+               pre_init_store[i.first] = i.second->to_str();
+             } else {
+               dout(20) << " rm " << i.first << dendl;
+               pre_init_store.erase(i.first);
+             }
+           }
+         }
+       }
+      }
+      break;
 
     default:
       return false;
index 157105d788e7c9ea687ebfb516a99ab75d41d785..28a7da93de08312fc3ece50de516af6791e37050 100644 (file)
@@ -61,6 +61,8 @@ protected:
   LogChannelRef clog;
   LogChannelRef audit_clog;
 
+  std::map<std::string, std::string> pre_init_store;
+
   void load_all_metadata();
   std::map<std::string, std::string> load_store();
   void init();
index 221b93beed7b48dfb9e1a6ad864b20229da51160..1ae44143c7d80f36a5cf2e252fb98a0d672380c4 100644 (file)
@@ -183,6 +183,7 @@ void PyModuleRegistry::standby_start(MonClient &mc, Finisher &f)
 void PyModuleRegistry::active_start(
             DaemonStateIndex &ds, ClusterState &cs,
             const std::map<std::string, std::string> &kv_store,
+           bool mon_provides_kv_sub,
             MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
             Objecter &objecter_, Client &client_, Finisher &f,
             DaemonServer &server)
@@ -202,10 +203,13 @@ void PyModuleRegistry::active_start(
     standby_modules.reset();
   }
 
-  active_modules.reset(new ActivePyModules(
-              module_config, kv_store, ds, cs, mc,
-              clog_, audit_clog_, objecter_, client_, f, server,
-              *this));
+  active_modules.reset(
+    new ActivePyModules(
+      module_config,
+      kv_store, mon_provides_kv_sub,
+      ds, cs, mc,
+      clog_, audit_clog_, objecter_, client_, f, server,
+      *this));
 
   for (const auto &i : modules) {
     // Anything we're skipping because of !can_run will be flagged
index c16c4bf4c8ea6a02e3ebadece2c672ba55d64242..a5f38e8f65a2729f82d49ce06d690c8c281b2434 100644 (file)
@@ -66,6 +66,14 @@ public:
   void handle_config(const std::string &k, const std::string &v);
   void handle_config_notify();
 
+  void update_kv_data(
+    const std::string prefix,
+    bool incremental,
+    const map<std::string, boost::optional<bufferlist>, std::less<>>& data) {
+    ceph_assert(active_modules);
+    active_modules->update_kv_data(prefix, incremental, data);
+  }
+
   /**
    * Get references to all modules (whether they have loaded and/or
    * errored) or not.
@@ -99,6 +107,7 @@ public:
   void active_start(
                 DaemonStateIndex &ds, ClusterState &cs,
                 const std::map<std::string, std::string> &kv_store,
+               bool mon_provides_kv_sub,
                 MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
                 Objecter &objecter_, Client &client_, Finisher &f,
                 DaemonServer &server);