From: Sage Weil Date: Tue, 16 Feb 2021 19:43:19 +0000 (-0500) Subject: mon: allow subscription to kv/config-key data X-Git-Tag: v17.1.0~2900^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fbc536d014e308ea884a4af70f36401c4ffadfdd;p=ceph.git mon: allow subscription to kv/config-key data Allow subscription to config-key/kv data. Initially we'll send a full dump of the prefix. As changes occur, we'll send incremental diffs, unless the subscriber is too far behind, in which case we'll send a full dump again. There is a new message, MKVData, to support this. No compat issues since old clients won't subscribe to this stream unless they know how to handle it. Signed-off-by: Sage Weil --- diff --git a/src/messages/MKVData.h b/src/messages/MKVData.h new file mode 100644 index 000000000000..f735f79d7eac --- /dev/null +++ b/src/messages/MKVData.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "msg/Message.h" + +class MKVData : public Message { +public: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + + version_t version; + std::string prefix; + bool incremental = false; + + // use transparent comparator so we can lookup in it by std::string_view keys + std::map,std::less<>> data; + + MKVData() : Message{MSG_KV_DATA, HEAD_VERSION, COMPAT_VERSION} { } + + std::string_view get_type_name() const override { + return "kv_data"; + } + void print(std::ostream& o) const override { + o << "kv_data(v" << version + << " prefix " << prefix << ", " + << (incremental ? "incremental, " : "full, ") + << data.size() << " keys" << ")"; + } + + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + decode(version, p); + decode(prefix, p); + decode(incremental, p); + decode(data, p); + } + + void encode_payload(uint64_t) override { + using ceph::encode; + encode(version, payload); + encode(prefix, payload); + encode(incremental, payload); + encode(data, payload); + } +}; diff --git a/src/mon/KVMonitor.cc b/src/mon/KVMonitor.cc index 0fd279d11f7e..5977e296c956 100644 --- a/src/mon/KVMonitor.cc +++ b/src/mon/KVMonitor.cc @@ -4,6 +4,7 @@ #include "mon/Monitor.h" #include "mon/KVMonitor.h" #include "include/stringify.h" +#include "messages/MKVData.h" #define dout_subsys ceph_subsys_mon #undef dout_prefix @@ -55,7 +56,7 @@ void KVMonitor::update_from_paxos(bool *need_bootstrap) } version = get_last_committed(); dout(10) << __func__ << " " << version << dendl; - //check_all_subs(); + check_all_subs(); } void KVMonitor::create_pending() @@ -417,3 +418,106 @@ void KVMonitor::do_osd_new( pending[dmcrypt_key_prefix] = dmcrypt_key_value; } + + +void KVMonitor::check_sub(MonSession *s) +{ + if (!s->authenticated) { + dout(20) << __func__ << " not authenticated " << s->entity_name << dendl; + return; + } + for (auto& p : s->sub_map) { + if (p.first.find("kv:") == 0) { + check_sub(p.second); + } + } +} + +void KVMonitor::check_sub(Subscription *sub) +{ + dout(10) << __func__ + << " next " << sub->next + << " have " << version << dendl; + if (sub->next <= version) { + maybe_send_update(sub); + if (sub->onetime) { + mon.with_session_map([sub](MonSessionMap& session_map) { + session_map.remove_sub(sub); + }); + } + } +} + +void KVMonitor::check_all_subs() +{ + dout(10) << __func__ << dendl; + int updated = 0, total = 0; + for (auto& i : mon.session_map.subs) { + if (i.first.find("kv:") == 0) { + auto p = i.second->begin(); + while (!p.end()) { + auto sub = *p; + ++p; + ++total; + if (maybe_send_update(sub)) { + ++updated; + } + } + } + } + dout(10) << __func__ << " updated " << updated << " / " << total << dendl; +} + +bool KVMonitor::maybe_send_update(Subscription *sub) +{ + if (sub->next > version) { + return false; + } + + auto m = new MKVData; + m->prefix = sub->type.substr(3); + m->version = version; + + if (sub->next && sub->next > get_first_committed()) { + // incremental + m->incremental = true; + + for (version_t cur = sub->next; cur <= version; ++cur) { + bufferlist bl; + int err = get_version(cur, bl); + ceph_assert(err == 0); + + std::map> pending; + auto p = bl.cbegin(); + ceph::decode(pending, p); + + for (auto& i : pending) { + if (i.first.find(m->prefix) == 0) { + m->data[i.first] = i.second; + } + } + } + + dout(10) << __func__ << " incremental keys for " << m->prefix + << ", v " << sub->next << ".." << version + << ", " << m->data.size() << " keys" + << dendl; + } else { + m->incremental = false; + + KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX); + iter->lower_bound(m->prefix); + while (iter->valid() && + iter->key().find(m->prefix) == 0) { + m->data[iter->key()] = iter->value(); + iter->next(); + } + + dout(10) << __func__ << " sending full dump of " << m->prefix + << ", " << m->data.size() << " keys" + << dendl; + } + sub->session->con->send_message(m); + sub->next = version + 1; + return true; +} diff --git a/src/mon/KVMonitor.h b/src/mon/KVMonitor.h index 28c71f003c2a..8c0204390f53 100644 --- a/src/mon/KVMonitor.h +++ b/src/mon/KVMonitor.h @@ -49,4 +49,10 @@ public: const std::string& dmcrypt_key, std::stringstream& ss); void do_osd_new(const uuid_d& uuid, const std::string& dmcrypt_key); + + void check_sub(MonSession *s); + void check_sub(Subscription *sub); + void check_all_subs(); + + bool maybe_send_update(Subscription *sub); }; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 30eb07410b20..e176945cbe2b 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -5213,6 +5213,8 @@ void Monitor::handle_subscribe(MonOpRequestRef op) mgrstatmon()->check_sub(s->sub_map[p->first]); } else if (p->first == "config") { configmon()->check_sub(s); + } else if (p->first.find("kv:") == 0) { + kvmon()->check_sub(s->sub_map[p->first]); } } diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 24413e7889f6..6c57d355bdf4 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -36,6 +36,7 @@ #include "messages/MMonPaxos.h" #include "messages/MConfig.h" #include "messages/MGetConfig.h" +#include "messages/MKVData.h" #include "messages/MMonProbe.h" #include "messages/MMonJoin.h" @@ -404,6 +405,9 @@ Message *decode_message(CephContext *cct, case MSG_GET_CONFIG: m = make_message(); break; + case MSG_KV_DATA: + m = make_message(); + break; case MSG_MON_PROBE: m = make_message(); diff --git a/src/msg/Message.h b/src/msg/Message.h index a36cc4dfbc27..8e1c4f967f05 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -67,6 +67,8 @@ #define MSG_CONFIG 62 #define MSG_GET_CONFIG 63 +#define MSG_KV_DATA 54 + #define MSG_MON_GET_PURGED_SNAPS 76 #define MSG_MON_GET_PURGED_SNAPS_REPLY 77