From: Sage Weil Date: Sat, 2 Dec 2017 04:42:45 +0000 (-0600) Subject: mon/ConfigMonitor: add monitor service for distributing configs X-Git-Tag: wip-pdonnell-testing-20180317.202121~121^2~123 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=38db0a289f572fa40542bd96a0d9ec717ac3db68;p=ceph-ci.git mon/ConfigMonitor: add monitor service for distributing configs Add 'config' subscribe option to get MConfigs on auth and when config is updated. Signed-off-by: Sage Weil --- diff --git a/src/messages/MMonSubscribe.h b/src/messages/MMonSubscribe.h index 74c2a87ea68..cc198c112f7 100644 --- a/src/messages/MMonSubscribe.h +++ b/src/messages/MMonSubscribe.h @@ -31,11 +31,13 @@ WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old) struct MMonSubscribe : public Message { - static const int HEAD_VERSION = 2; + static const int HEAD_VERSION = 3; + static const int COMPAT_VERSION = 1; + string hostname; map what; - MMonSubscribe() : Message(CEPH_MSG_MON_SUBSCRIBE, HEAD_VERSION) { } + MMonSubscribe() : Message(CEPH_MSG_MON_SUBSCRIBE, HEAD_VERSION, COMPAT_VERSION) { } private: ~MMonSubscribe() override {} @@ -67,16 +69,16 @@ public: if (q->second.onetime) what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME; } - } else { - decode(what, p); + return; + } + decode(what, p); + if (header.version >= 3) { + decode(hostname, p); } } void encode_payload(uint64_t features) override { using ceph::encode; - if (features & CEPH_FEATURE_SUBSCRIBE2) { - encode(what, payload); - header.version = HEAD_VERSION; - } else { + if ((features & CEPH_FEATURE_SUBSCRIBE2) == 0) { header.version = 0; map oldwhat; for (map::iterator q = what.begin(); @@ -90,7 +92,11 @@ public: oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME; } encode(oldwhat, payload); + return; } + header.version = HEAD_VERSION; + encode(what, payload); + encode(hostname, payload); } }; diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc index 72ff9fd272d..5d7ed0c097d 100644 --- a/src/mon/AuthMonitor.cc +++ b/src/mon/AuthMonitor.cc @@ -20,6 +20,7 @@ #include "mon/ConfigKeyService.h" #include "mon/OSDMonitor.h" #include "mon/MDSMonitor.h" +#include "mon/ConfigMonitor.h" #include "messages/MMonCommand.h" #include "messages/MAuth.h" @@ -374,6 +375,7 @@ bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable) bufferlist::iterator indata = m->auth_payload.begin(); __u32 proto = m->protocol; bool start = false; + bool finished = false; EntityName entity_name; // set up handler? @@ -504,6 +506,7 @@ bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable) } s->caps.parse(str, NULL); s->auid = auid; + finished = true; } } catch (const buffer::error &err) { ret = -EINVAL; @@ -513,6 +516,9 @@ bool AuthMonitor::prep_auth(MonOpRequestRef op, bool paxos_writable) reply: reply = new MAuthReply(proto, &response_bl, ret, s->global_id); mon->send_reply(op, reply); + if (finished) { + mon->configmon()->check_sub(s); + } done: return true; } diff --git a/src/mon/CMakeLists.txt b/src/mon/CMakeLists.txt index 02b3ee409e0..403c954edaf 100644 --- a/src/mon/CMakeLists.txt +++ b/src/mon/CMakeLists.txt @@ -15,6 +15,7 @@ set(lib_mon_srcs LogMonitor.cc AuthMonitor.cc ConfigMap.cc + ConfigMonitor.cc Elector.cc HealthMonitor.cc PGMap.cc diff --git a/src/mon/ConfigMonitor.cc b/src/mon/ConfigMonitor.cc new file mode 100644 index 00000000000..f49e957256c --- /dev/null +++ b/src/mon/ConfigMonitor.cc @@ -0,0 +1,284 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "mon/Monitor.h" +#include "mon/ConfigMonitor.h" +#include "mon/OSDMonitor.h" +#include "messages/MConfig.h" +#include "common/Formatter.h" + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix _prefix(_dout, mon, this) +static ostream& _prefix(std::ostream *_dout, const Monitor *mon, + const ConfigMonitor *hmon) { + return *_dout << "mon." << mon->name << "@" << mon->rank + << "(" << mon->get_state_name() << ").config "; +} + +const string KEY_PREFIX("config/"); + +ConfigMonitor::ConfigMonitor(Monitor *m, Paxos *p, const string& service_name) + : PaxosService(m, p, service_name) { +} + +void ConfigMonitor::init() +{ + dout(10) << __func__ << dendl; +} + +void ConfigMonitor::create_initial() +{ + dout(10) << __func__ << dendl; + version = 0; + pending.clear(); +} + +void ConfigMonitor::update_from_paxos(bool *need_bootstrap) +{ + version = get_last_committed(); + dout(10) << __func__ << dendl; + load_config(); + +#warning fixme: load changed sections to hint load_config() +} + +void ConfigMonitor::create_pending() +{ + dout(10) << " " << version << dendl; + pending.clear(); +} + +void ConfigMonitor::encode_pending(MonitorDBStore::TransactionRef t) +{ + ++version; + dout(10) << " " << version << dendl; + put_last_committed(t, version); + +#warning fixme: record changed sections (osd, mds.foo, rack:bar, ...) + + for (auto& p : pending) { + string key = KEY_PREFIX + p.first; + if (p.second) { + t->put(CONFIG_PREFIX, key, *p.second); + } else { + t->erase(CONFIG_PREFIX, key); + } + } +} + +version_t ConfigMonitor::get_trim_to() const +{ + // we don't actually need *any* old states, but keep a few. + if (version > 5) { + return version - 5; + } + return 0; +} + +bool ConfigMonitor::preprocess_query(MonOpRequestRef op) +{ + return false; +} + +bool ConfigMonitor::prepare_update(MonOpRequestRef op) +{ + Message *m = op->get_req(); + dout(7) << "prepare_update " << *m + << " from " << m->get_orig_source_inst() << dendl; + return false; +} + +void ConfigMonitor::tick() +{ + if (!is_active()) { + return; + } + dout(10) << __func__ << dendl; + bool changed = false; + if (changed) { + propose_pending(); + } +} + +void ConfigMonitor::on_active() +{ + check_all_subs(); +} + +void ConfigMonitor::load_config() +{ + unsigned num = 0; + KeyValueDB::Iterator it = mon->store->get_iterator(CONFIG_PREFIX); + it->lower_bound(KEY_PREFIX); + config_map.clear(); + while (it->valid() && + it->key().compare(0, KEY_PREFIX.size(), KEY_PREFIX) == 0) { + string key = it->key().substr(KEY_PREFIX.size()); + string value = it->value().to_str(); + vector split; + boost::split(split, key, [](char c){return c == '/';}); + string name = split.back(); + split.pop_back(); + Section *section = &config_map.global; + + Option fake_opt(name, Option::TYPE_STR, Option::LEVEL_DEV); + const Option *opt = g_conf->find_option(name); + if (!opt) { + dout(10) << __func__ << " unrecognized option '" << name << "'" << dendl; + opt = &fake_opt; + } + string err; + int r = opt->pre_validate(&value, &err); + if (r < 0) { + dout(10) << __func__ << " pre-validate failed on '" << name << "' = '" + << value << "' for " << split << dendl; + } + + MaskedOption mopt(*opt); + mopt.raw_value = value; + string device_class; + string loc_type, loc_value; + for (unsigned j = 0; j < split.size(); ++j) { + auto& i = split[j]; + size_t delim = i.find(':'); + if (delim != std::string::npos) { + string k = i.substr(0, delim); + if (k == "class") { + mopt.device_class = i.substr(delim + 1); + } else { + mopt.location_type = k; + mopt.location_value = i.substr(delim + 1); + } + continue; + } + if (split.front().find('.') != std::string::npos) { + section = &config_map.by_id[i]; + } else { + section = &config_map.by_type[i]; + } + } + section->options.insert(make_pair(name, mopt)); + ++num; + it->next(); + } + dout(10) << __func__ << " got " << num << " keys" << dendl; + dout(20) << __func__ << " config map:\n"; + JSONFormatter jf(true); + jf.open_object_section("config_map"); + config_map.dump(&jf); + jf.close_section(); + jf.flush(*_dout); + *_dout << dendl; +} + +bool ConfigMonitor::refresh_config(MonSession *s) +{ + const OSDMap& osdmap = mon->osdmon()->osdmap; + + map crush_location; + if (s->remote_host.size()) { + osdmap.crush->get_full_location(s->remote_host, &crush_location); + dout(10) << __func__ << " crush_location for remote_host " << s->remote_host + << " is " << crush_location << dendl; + } + + string device_class; + if (s->inst.name.is_osd()) { + const char *c = osdmap.crush->get_item_class(s->inst.name.num()); + if (c) { + device_class = c; + dout(10) << __func__ << " device_class " << device_class << dendl; + } + } + + dout(20) << __func__ << " " << s->entity_name << " crush " << crush_location + << " device_class " << device_class << dendl; + map out; + config_map.generate_entity_map( + s->entity_name, + crush_location, + osdmap.crush.get(), + device_class, + &out); + + if (out == s->last_config) { + dout(20) << __func__ << " no change, " << out << dendl; + return false; + } + + dout(20) << __func__ << " " << out << dendl; + s->last_config = out; + return true; +} + +bool ConfigMonitor::maybe_send_config(MonSession *s) +{ + bool changed = refresh_config(s); + dout(10) << __func__ << " to " << s->inst << " " + << (changed ? "(changed)" : "(unchanged)") + << dendl; + if (changed) { + send_config(s); + } + return changed; +} + +void ConfigMonitor::send_config(MonSession *s) +{ + dout(10) << __func__ << " to " << s->inst << dendl; + auto m = new MConfig(s->last_config); + s->con->send_message(m); +} + +void ConfigMonitor::check_sub(MonSession *s) +{ + if (!s->is_capable(s->entity_name.get_type_str(), MON_CAP_R)) { + dout(20) << __func__ << " not capable for " << s->entity_name << " with " + << s->caps << dendl; + return; + } + auto p = s->sub_map.find("config"); + if (p != s->sub_map.end()) { + check_sub(p->second); + } +} + +void ConfigMonitor::check_sub(Subscription *sub) +{ + dout(10) << __func__ + << " next " << sub->next + << " have " << version << dendl; + if (sub->next <= version) { + maybe_send_config(sub->session); + if (sub->onetime) { + mon->with_session_map([this, sub](MonSessionMap& session_map) { + session_map.remove_sub(sub); + }); + } else { + sub->next = version + 1; + } + } +} + +void ConfigMonitor::check_all_subs() +{ + dout(10) << __func__ << dendl; + auto subs = mon->session_map.subs.find("config"); + if (subs == mon->session_map.subs.end()) { + return; + } + int updated = 0, total = 0; + auto p = subs->second->begin(); + while (!p.end()) { + auto sub = *p; + ++p; + ++total; + if (maybe_send_config(sub->session)) { + ++updated; + } + } + dout(10) << __func__ << " updated " << updated << " / " << total << dendl; +} diff --git a/src/mon/ConfigMonitor.h b/src/mon/ConfigMonitor.h new file mode 100644 index 00000000000..5559385360e --- /dev/null +++ b/src/mon/ConfigMonitor.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include "ConfigMap.h" +#include "mon/PaxosService.h" + +class MonSession; + +class ConfigMonitor : public PaxosService +{ + version_t version = 0; + ConfigMap config_map; + map> pending; + +public: + ConfigMonitor(Monitor *m, Paxos *p, const string& service_name); + + void init() override; + + void load_config(); + + bool preprocess_query(MonOpRequestRef op) override; + bool prepare_update(MonOpRequestRef op) override; + + void create_initial() override; + void update_from_paxos(bool *need_bootstrap) override; + void create_pending() override; + void encode_pending(MonitorDBStore::TransactionRef t) override; + version_t get_trim_to() const override; + + void encode_full(MonitorDBStore::TransactionRef t) override { } + + void on_active() override; + void tick() override; + + bool refresh_config(MonSession *s); + bool maybe_send_config(MonSession *s); + void send_config(MonSession *s); + void check_sub(MonSession *s); + void check_sub(Subscription *sub); + void check_all_subs(); +}; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 4c6ead9d507..4a23a73f73f 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -74,6 +74,7 @@ #include "AuthMonitor.h" #include "MgrMonitor.h" #include "MgrStatMonitor.h" +#include "ConfigMonitor.h" #include "mon/QuorumService.h" #include "mon/HealthMonitor.h" #include "mon/ConfigKeyService.h" @@ -188,6 +189,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr"); paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat"); paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health"); + paxos_service[PAXOS_CONFIG] = new ConfigMonitor(this, paxos, "config"); config_key_service = new ConfigKeyService(this, paxos); @@ -4783,6 +4785,8 @@ void Monitor::handle_subscribe(MonOpRequestRef op) mgrmon()->check_sub(s->sub_map[p->first]); } else if (p->first == "servicemap") { mgrstatmon()->check_sub(s->sub_map[p->first]); + } else if (p->first == "config") { + configmon()->check_sub(s); } } diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 3bf4c3c30ca..5c626905674 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -639,6 +639,10 @@ public: return (class HealthMonitor*) paxos_service[PAXOS_HEALTH]; } + class ConfigMonitor *configmon() { + return (class ConfigMonitor*) paxos_service[PAXOS_CONFIG]; + } + friend class Paxos; friend class OSDMonitor; friend class MDSMonitor; diff --git a/src/mon/Session.h b/src/mon/Session.h index fbe21c9c771..a45e8c02e78 100644 --- a/src/mon/Session.h +++ b/src/mon/Session.h @@ -60,6 +60,9 @@ struct MonSession : public RefCountedObject { ConnectionRef proxy_con; uint64_t proxy_tid; + string remote_host; ///< remote host name + map last_config; ///< most recently shared config + MonSession(const entity_inst_t& i, Connection *c) : RefCountedObject(g_ceph_context), con(c), diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index c705ded5f81..8e753ea6855 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -31,7 +31,8 @@ #define PAXOS_MGR 5 #define PAXOS_MGRSTAT 6 #define PAXOS_HEALTH 7 -#define PAXOS_NUM 8 +#define PAXOS_CONFIG 8 +#define PAXOS_NUM 9 inline const char *get_paxos_name(int p) { switch (p) { @@ -43,6 +44,7 @@ inline const char *get_paxos_name(int p) { case PAXOS_MGR: return "mgr"; case PAXOS_MGRSTAT: return "mgrstat"; case PAXOS_HEALTH: return "health"; + case PAXOS_CONFIG: return "config"; default: ceph_abort(); return 0; } }