]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mon/ConfigMonitor: add monitor service for distributing configs
authorSage Weil <sage@redhat.com>
Sat, 2 Dec 2017 04:42:45 +0000 (22:42 -0600)
committerSage Weil <sage@redhat.com>
Tue, 6 Mar 2018 20:44:48 +0000 (14:44 -0600)
Add 'config' subscribe option to get MConfigs on auth and when config is
updated.

Signed-off-by: Sage Weil <sage@redhat.com>
src/messages/MMonSubscribe.h
src/mon/AuthMonitor.cc
src/mon/CMakeLists.txt
src/mon/ConfigMonitor.cc [new file with mode: 0644]
src/mon/ConfigMonitor.h [new file with mode: 0644]
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/Session.h
src/mon/mon_types.h

index 74c2a87ea68abbc161d9a9ccb19fa98d1025002a..cc198c112f77a90373a7ebdf844a7dfea886c2e3 100644 (file)
@@ -31,11 +31,13 @@ WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old)
 
 struct MMonSubscribe : public Message {
 
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 3;
+  static const int COMPAT_VERSION = 1;
 
+  string hostname;
   map<string, ceph_mon_subscribe_item> what;
   
-  MMonSubscribe() : Message(CEPH_MSG_MON_SUBSCRIBE, HEAD_VERSION) { }
+  MMonSubscribe() : Message(CEPH_MSG_MON_SUBSCRIBE, HEAD_VERSION, COMPAT_VERSION) { }
 private:
   ~MMonSubscribe() override {}
 
@@ -67,16 +69,16 @@ public:
        if (q->second.onetime)
          what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME;
       }
-    } else {
-      decode(what, p);
+      return;
+    }
+    decode(what, p);
+    if (header.version >= 3) {
+      decode(hostname, p);
     }
   }
   void encode_payload(uint64_t features) override {
     using ceph::encode;
-    if (features & CEPH_FEATURE_SUBSCRIBE2) {
-      encode(what, payload);
-      header.version = HEAD_VERSION;
-    } else {
+    if ((features & CEPH_FEATURE_SUBSCRIBE2) == 0) {
       header.version = 0;
       map<string, ceph_mon_subscribe_item_old> oldwhat;
       for (map<string, ceph_mon_subscribe_item>::iterator q = what.begin();
@@ -90,7 +92,11 @@ public:
        oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME;
       }
       encode(oldwhat, payload);
+      return;
     }
+    header.version = HEAD_VERSION;
+    encode(what, payload);
+    encode(hostname, payload);
   }
 };
 
index 72ff9fd272d357b252454375e63fdd4194e9a835..5d7ed0c097dc068d4f6da4d761892dad151bb03a 100644 (file)
@@ -20,6 +20,7 @@
 #include "mon/ConfigKeyService.h"
 #include "mon/OSDMonitor.h"
 #include "mon/MDSMonitor.h"
+#include "mon/ConfigMonitor.h"
 
 #include "messages/MMonCommand.h"
 #include "messages/MAuth.h"
@@ -374,6 +375,7 @@ bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable)
   bufferlist::iterator indata = m->auth_payload.begin();
   __u32 proto = m->protocol;
   bool start = false;
+  bool finished = false;
   EntityName entity_name;
 
   // set up handler?
@@ -504,6 +506,7 @@ bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable)
       }
       s->caps.parse(str, NULL);
       s->auid = auid;
+      finished = true;
     }
   } catch (const buffer::error &err) {
     ret = -EINVAL;
@@ -513,6 +516,9 @@ bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable)
 reply:
   reply = new MAuthReply(proto, &response_bl, ret, s->global_id);
   mon->send_reply(op, reply);
+  if (finished) {
+    mon->configmon()->check_sub(s);
+  }
 done:
   return true;
 }
index 02b3ee409e0541c375f907e4437918b46f933c95..403c954edaf2c8ee498434fb8e337b1bbb8f7372 100644 (file)
@@ -15,6 +15,7 @@ set(lib_mon_srcs
   LogMonitor.cc
   AuthMonitor.cc
   ConfigMap.cc
+  ConfigMonitor.cc
   Elector.cc
   HealthMonitor.cc
   PGMap.cc
diff --git a/src/mon/ConfigMonitor.cc b/src/mon/ConfigMonitor.cc
new file mode 100644 (file)
index 0000000..f49e957
--- /dev/null
@@ -0,0 +1,284 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/algorithm/string/split.hpp>
+
+#include "mon/Monitor.h"
+#include "mon/ConfigMonitor.h"
+#include "mon/OSDMonitor.h"
+#include "messages/MConfig.h"
+#include "common/Formatter.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mon, this)
+static ostream& _prefix(std::ostream *_dout, const Monitor *mon,
+                        const ConfigMonitor *hmon) {
+  return *_dout << "mon." << mon->name << "@" << mon->rank
+               << "(" << mon->get_state_name() << ").config ";
+}
+
+const string KEY_PREFIX("config/");
+
+ConfigMonitor::ConfigMonitor(Monitor *m, Paxos *p, const string& service_name)
+  : PaxosService(m, p, service_name) {
+}
+
+void ConfigMonitor::init()
+{
+  dout(10) << __func__ << dendl;
+}
+
+void ConfigMonitor::create_initial()
+{
+  dout(10) << __func__ << dendl;
+  version = 0;
+  pending.clear();
+}
+
+void ConfigMonitor::update_from_paxos(bool *need_bootstrap)
+{
+  version = get_last_committed();
+  dout(10) << __func__ << dendl;
+  load_config();
+
+#warning fixme: load changed sections to hint load_config()
+}
+
+void ConfigMonitor::create_pending()
+{
+  dout(10) << " " << version << dendl;
+  pending.clear();
+}
+
+void ConfigMonitor::encode_pending(MonitorDBStore::TransactionRef t)
+{
+  ++version;
+  dout(10) << " " << version << dendl;
+  put_last_committed(t, version);
+
+#warning fixme: record changed sections (osd, mds.foo, rack:bar, ...)
+
+  for (auto& p : pending) {
+    string key = KEY_PREFIX + p.first;
+    if (p.second) {
+      t->put(CONFIG_PREFIX, key, *p.second);
+    } else {
+      t->erase(CONFIG_PREFIX, key);
+    }
+  }
+}
+
+version_t ConfigMonitor::get_trim_to() const
+{
+  // we don't actually need *any* old states, but keep a few.
+  if (version > 5) {
+    return version - 5;
+  }
+  return 0;
+}
+
+bool ConfigMonitor::preprocess_query(MonOpRequestRef op)
+{
+  return false;
+}
+
+bool ConfigMonitor::prepare_update(MonOpRequestRef op)
+{
+  Message *m = op->get_req();
+  dout(7) << "prepare_update " << *m
+         << " from " << m->get_orig_source_inst() << dendl;
+  return false;
+}
+
+void ConfigMonitor::tick()
+{
+  if (!is_active()) {
+    return;
+  }
+  dout(10) << __func__ << dendl;
+  bool changed = false;
+  if (changed) {
+    propose_pending();
+  }
+}
+
+void ConfigMonitor::on_active()
+{
+  check_all_subs();
+}
+
+void ConfigMonitor::load_config()
+{
+  unsigned num = 0;
+  KeyValueDB::Iterator it = mon->store->get_iterator(CONFIG_PREFIX);
+  it->lower_bound(KEY_PREFIX);
+  config_map.clear();
+  while (it->valid() &&
+        it->key().compare(0, KEY_PREFIX.size(), KEY_PREFIX) == 0) {
+    string key = it->key().substr(KEY_PREFIX.size());
+    string value = it->value().to_str();
+    vector<string> split;
+    boost::split(split, key, [](char c){return c == '/';});
+    string name = split.back();
+    split.pop_back();
+    Section *section = &config_map.global;
+
+    Option fake_opt(name, Option::TYPE_STR, Option::LEVEL_DEV);
+    const Option *opt = g_conf->find_option(name);
+    if (!opt) {
+      dout(10) << __func__ << " unrecognized option '" << name << "'" << dendl;
+      opt = &fake_opt;
+    }
+    string err;
+    int r = opt->pre_validate(&value, &err);
+    if (r < 0) {
+      dout(10) << __func__ << " pre-validate failed on '" << name << "' = '"
+              << value << "' for " << split << dendl;
+    }
+
+    MaskedOption mopt(*opt);
+    mopt.raw_value = value;
+    string device_class;
+    string loc_type, loc_value;
+    for (unsigned j = 0; j < split.size(); ++j) {
+      auto& i = split[j];
+      size_t delim = i.find(':');
+      if (delim != std::string::npos) {
+       string k = i.substr(0, delim);
+       if (k == "class") {
+         mopt.device_class = i.substr(delim + 1);
+       } else {
+         mopt.location_type = k;
+         mopt.location_value = i.substr(delim + 1);
+       }
+       continue;
+      }
+      if (split.front().find('.') != std::string::npos) {
+       section = &config_map.by_id[i];
+      } else {
+       section = &config_map.by_type[i];
+      }
+    }
+    section->options.insert(make_pair(name, mopt));
+    ++num;
+    it->next();
+  }
+  dout(10) << __func__ << " got " << num << " keys" << dendl;
+  dout(20) << __func__ << " config map:\n";
+  JSONFormatter jf(true);
+  jf.open_object_section("config_map");
+  config_map.dump(&jf);
+  jf.close_section();
+  jf.flush(*_dout);
+  *_dout << dendl;
+}
+
+bool ConfigMonitor::refresh_config(MonSession *s)
+{
+  const OSDMap& osdmap = mon->osdmon()->osdmap;
+
+  map<string,string> crush_location;
+  if (s->remote_host.size()) {
+    osdmap.crush->get_full_location(s->remote_host, &crush_location);
+    dout(10) << __func__ << " crush_location for remote_host " << s->remote_host
+            << " is " << crush_location << dendl;
+  }
+
+  string device_class;
+  if (s->inst.name.is_osd()) {
+    const char *c = osdmap.crush->get_item_class(s->inst.name.num());
+    if (c) {
+      device_class = c;
+      dout(10) << __func__ << " device_class " << device_class << dendl;
+    }
+  }
+
+  dout(20) << __func__ << " " << s->entity_name << " crush " << crush_location
+          << " device_class " << device_class << dendl;
+  map<string,string> out;
+  config_map.generate_entity_map(
+    s->entity_name,
+    crush_location,
+    osdmap.crush.get(),
+    device_class,
+    &out);
+
+  if (out == s->last_config) {
+    dout(20) << __func__ << " no change, " << out << dendl;
+    return false;
+  }
+
+  dout(20) << __func__ << " " << out << dendl;
+  s->last_config = out;
+  return true;
+}
+
+bool ConfigMonitor::maybe_send_config(MonSession *s)
+{
+  bool changed = refresh_config(s);
+  dout(10) << __func__ << " to " << s->inst << " "
+          << (changed ? "(changed)" : "(unchanged)")
+          << dendl;
+  if (changed) {
+    send_config(s);
+  }
+  return changed;
+}
+
+void ConfigMonitor::send_config(MonSession *s)
+{
+  dout(10) << __func__ << " to " << s->inst << dendl;
+  auto m = new MConfig(s->last_config);
+  s->con->send_message(m);
+}
+
+void ConfigMonitor::check_sub(MonSession *s)
+{
+  if (!s->is_capable(s->entity_name.get_type_str(), MON_CAP_R)) {
+    dout(20) << __func__ << " not capable for " << s->entity_name << " with "
+            << s->caps << dendl;
+    return;
+  }
+  auto p = s->sub_map.find("config");
+  if (p != s->sub_map.end()) {
+    check_sub(p->second);
+  }
+}
+
+void ConfigMonitor::check_sub(Subscription *sub)
+{
+  dout(10) << __func__
+          << " next " << sub->next
+          << " have " << version << dendl;
+  if (sub->next <= version) {
+    maybe_send_config(sub->session);
+    if (sub->onetime) {
+      mon->with_session_map([this, sub](MonSessionMap& session_map) {
+         session_map.remove_sub(sub);
+       });
+    } else {
+      sub->next = version + 1;
+    }
+  }
+}
+
+void ConfigMonitor::check_all_subs()
+{
+  dout(10) << __func__ << dendl;
+  auto subs = mon->session_map.subs.find("config");
+  if (subs == mon->session_map.subs.end()) {
+    return;
+  }
+  int updated = 0, total = 0;
+  auto p = subs->second->begin();
+  while (!p.end()) {
+    auto sub = *p;
+    ++p;
+    ++total;
+    if (maybe_send_config(sub->session)) {
+      ++updated;
+    }
+  }
+  dout(10) << __func__ << " updated " << updated << " / " << total << dendl;
+}
diff --git a/src/mon/ConfigMonitor.h b/src/mon/ConfigMonitor.h
new file mode 100644 (file)
index 0000000..5559385
--- /dev/null
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "ConfigMap.h"
+#include "mon/PaxosService.h"
+
+class MonSession;
+
+class ConfigMonitor : public PaxosService
+{
+  version_t version = 0;
+  ConfigMap config_map;
+  map<string,boost::optional<bufferlist>> pending;
+
+public:
+  ConfigMonitor(Monitor *m, Paxos *p, const string& service_name);
+
+  void init() override;
+
+  void load_config();
+
+  bool preprocess_query(MonOpRequestRef op) override;
+  bool prepare_update(MonOpRequestRef op) override;
+
+  void create_initial() override;
+  void update_from_paxos(bool *need_bootstrap) override;
+  void create_pending() override;
+  void encode_pending(MonitorDBStore::TransactionRef t) override;
+  version_t get_trim_to() const override;
+
+  void encode_full(MonitorDBStore::TransactionRef t) override { }
+
+  void on_active() override;
+  void tick() override;
+
+  bool refresh_config(MonSession *s);
+  bool maybe_send_config(MonSession *s);
+  void send_config(MonSession *s);
+  void check_sub(MonSession *s);
+  void check_sub(Subscription *sub);
+  void check_all_subs();
+};
index 4c6ead9d507ff81c8377df3f6e180cf37d2e183f..4a23a73f73f5217950f814319bd4e29851b2d095 100644 (file)
@@ -74,6 +74,7 @@
 #include "AuthMonitor.h"
 #include "MgrMonitor.h"
 #include "MgrStatMonitor.h"
+#include "ConfigMonitor.h"
 #include "mon/QuorumService.h"
 #include "mon/HealthMonitor.h"
 #include "mon/ConfigKeyService.h"
@@ -188,6 +189,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
   paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
   paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
+  paxos_service[PAXOS_CONFIG] = new ConfigMonitor(this, paxos, "config");
 
   config_key_service = new ConfigKeyService(this, paxos);
 
@@ -4783,6 +4785,8 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       mgrmon()->check_sub(s->sub_map[p->first]);
     } else if (p->first == "servicemap") {
       mgrstatmon()->check_sub(s->sub_map[p->first]);
+    } else if (p->first == "config") {
+      configmon()->check_sub(s);
     }
   }
 
index 3bf4c3c30ca9b7c477bd3402683508bdeb4ccc94..5c6269056747e0695471ab721918c4f9428b395b 100644 (file)
@@ -639,6 +639,10 @@ public:
     return (class HealthMonitor*) paxos_service[PAXOS_HEALTH];
   }
 
+  class ConfigMonitor *configmon() {
+    return (class ConfigMonitor*) paxos_service[PAXOS_CONFIG];
+  }
+
   friend class Paxos;
   friend class OSDMonitor;
   friend class MDSMonitor;
index fbe21c9c771df14084882456e79c8a3ff78818d3..a45e8c02e78a70723be2d0d5db4b810c72c1f1a3 100644 (file)
@@ -60,6 +60,9 @@ struct MonSession : public RefCountedObject {
   ConnectionRef proxy_con;
   uint64_t proxy_tid;
 
+  string remote_host;                ///< remote host name
+  map<string,string> last_config;    ///< most recently shared config
+
   MonSession(const entity_inst_t& i, Connection *c) :
     RefCountedObject(g_ceph_context),
     con(c),
index c705ded5f8114b706f42df8c1cb2a55606443c26..8e753ea685558c65a49510428f97c37bfc5d1081 100644 (file)
@@ -31,7 +31,8 @@
 #define PAXOS_MGR        5
 #define PAXOS_MGRSTAT    6
 #define PAXOS_HEALTH     7
-#define PAXOS_NUM        8
+#define PAXOS_CONFIG     8
+#define PAXOS_NUM        9
 
 inline const char *get_paxos_name(int p) {
   switch (p) {
@@ -43,6 +44,7 @@ inline const char *get_paxos_name(int p) {
   case PAXOS_MGR: return "mgr";
   case PAXOS_MGRSTAT: return "mgrstat";
   case PAXOS_HEALTH: return "health";
+  case PAXOS_CONFIG: return "config";
   default: ceph_abort(); return 0;
   }
 }