--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/Message.h"
+
+class MKVData : public Message {
+public:
+ static constexpr int HEAD_VERSION = 1;
+ static constexpr int COMPAT_VERSION = 1;
+
+ version_t version;
+ std::string prefix;
+ bool incremental = false;
+
+ // use transparent comparator so we can lookup in it by std::string_view keys
+ std::map<std::string,boost::optional<bufferlist>,std::less<>> data;
+
+ MKVData() : Message{MSG_KV_DATA, HEAD_VERSION, COMPAT_VERSION} { }
+
+ std::string_view get_type_name() const override {
+ return "kv_data";
+ }
+ void print(std::ostream& o) const override {
+ o << "kv_data(v" << version
+ << " prefix " << prefix << ", "
+ << (incremental ? "incremental, " : "full, ")
+ << data.size() << " keys" << ")";
+ }
+
+ void decode_payload() override {
+ using ceph::decode;
+ auto p = payload.cbegin();
+ decode(version, p);
+ decode(prefix, p);
+ decode(incremental, p);
+ decode(data, p);
+ }
+
+ void encode_payload(uint64_t) override {
+ using ceph::encode;
+ encode(version, payload);
+ encode(prefix, payload);
+ encode(incremental, payload);
+ encode(data, payload);
+ }
+};
#include "mon/Monitor.h"
#include "mon/KVMonitor.h"
#include "include/stringify.h"
+#include "messages/MKVData.h"
#define dout_subsys ceph_subsys_mon
#undef dout_prefix
}
version = get_last_committed();
dout(10) << __func__ << " " << version << dendl;
- //check_all_subs();
+ check_all_subs();
}
void KVMonitor::create_pending()
pending[dmcrypt_key_prefix] = dmcrypt_key_value;
}
+
+
+void KVMonitor::check_sub(MonSession *s)
+{
+ if (!s->authenticated) {
+ dout(20) << __func__ << " not authenticated " << s->entity_name << dendl;
+ return;
+ }
+ for (auto& p : s->sub_map) {
+ if (p.first.find("kv:") == 0) {
+ check_sub(p.second);
+ }
+ }
+}
+
+void KVMonitor::check_sub(Subscription *sub)
+{
+ dout(10) << __func__
+ << " next " << sub->next
+ << " have " << version << dendl;
+ if (sub->next <= version) {
+ maybe_send_update(sub);
+ if (sub->onetime) {
+ mon.with_session_map([sub](MonSessionMap& session_map) {
+ session_map.remove_sub(sub);
+ });
+ }
+ }
+}
+
+void KVMonitor::check_all_subs()
+{
+ dout(10) << __func__ << dendl;
+ int updated = 0, total = 0;
+ for (auto& i : mon.session_map.subs) {
+ if (i.first.find("kv:") == 0) {
+ auto p = i.second->begin();
+ while (!p.end()) {
+ auto sub = *p;
+ ++p;
+ ++total;
+ if (maybe_send_update(sub)) {
+ ++updated;
+ }
+ }
+ }
+ }
+ dout(10) << __func__ << " updated " << updated << " / " << total << dendl;
+}
+
+bool KVMonitor::maybe_send_update(Subscription *sub)
+{
+ if (sub->next > version) {
+ return false;
+ }
+
+ auto m = new MKVData;
+ m->prefix = sub->type.substr(3);
+ m->version = version;
+
+ if (sub->next && sub->next > get_first_committed()) {
+ // incremental
+ m->incremental = true;
+
+ for (version_t cur = sub->next; cur <= version; ++cur) {
+ bufferlist bl;
+ int err = get_version(cur, bl);
+ ceph_assert(err == 0);
+
+ std::map<std::string,boost::optional<ceph::buffer::list>> pending;
+ auto p = bl.cbegin();
+ ceph::decode(pending, p);
+
+ for (auto& i : pending) {
+ if (i.first.find(m->prefix) == 0) {
+ m->data[i.first] = i.second;
+ }
+ }
+ }
+
+ dout(10) << __func__ << " incremental keys for " << m->prefix
+ << ", v " << sub->next << ".." << version
+ << ", " << m->data.size() << " keys"
+ << dendl;
+ } else {
+ m->incremental = false;
+
+ KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX);
+ iter->lower_bound(m->prefix);
+ while (iter->valid() &&
+ iter->key().find(m->prefix) == 0) {
+ m->data[iter->key()] = iter->value();
+ iter->next();
+ }
+
+ dout(10) << __func__ << " sending full dump of " << m->prefix
+ << ", " << m->data.size() << " keys"
+ << dendl;
+ }
+ sub->session->con->send_message(m);
+ sub->next = version + 1;
+ return true;
+}