]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: allow subscription to kv/config-key data
authorSage Weil <sage@newdream.net>
Tue, 16 Feb 2021 19:43:19 +0000 (14:43 -0500)
committerSage Weil <sage@newdream.net>
Sun, 21 Feb 2021 13:52:21 +0000 (07:52 -0600)
Allow subscription to config-key/kv data.  Initially we'll send a full
dump of the prefix.  As changes occur, we'll send incremental diffs,
unless the subscriber is too far behind, in which case we'll send a full
dump again.

There is a new message, MKVData, to support this.

No compat issues since old clients won't subscribe to this stream unless
they know how to handle it.

Signed-off-by: Sage Weil <sage@newdream.net>
(cherry picked from commit fbc536d014e308ea884a4af70f36401c4ffadfdd)

src/messages/MKVData.h [new file with mode: 0644]
src/mon/KVMonitor.cc
src/mon/KVMonitor.h
src/mon/Monitor.cc
src/msg/Message.cc
src/msg/Message.h

diff --git a/src/messages/MKVData.h b/src/messages/MKVData.h
new file mode 100644 (file)
index 0000000..f735f79
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/Message.h"
+
+class MKVData : public Message {
+public:
+  static constexpr int HEAD_VERSION = 1;
+  static constexpr int COMPAT_VERSION = 1;
+
+  version_t version;
+  std::string prefix;
+  bool incremental = false;
+
+  // use transparent comparator so we can lookup in it by std::string_view keys
+  std::map<std::string,boost::optional<bufferlist>,std::less<>> data;
+
+  MKVData() : Message{MSG_KV_DATA, HEAD_VERSION, COMPAT_VERSION} { }
+
+  std::string_view get_type_name() const override {
+    return "kv_data";
+  }
+  void print(std::ostream& o) const override {
+    o << "kv_data(v" << version
+      << " prefix " << prefix << ", "
+      << (incremental ? "incremental, " : "full, ")
+      << data.size() << " keys" << ")";
+  }
+
+  void decode_payload() override {
+    using ceph::decode;
+    auto p = payload.cbegin();
+    decode(version, p);
+    decode(prefix, p);
+    decode(incremental, p);
+    decode(data, p);
+  }
+
+  void encode_payload(uint64_t) override {
+    using ceph::encode;
+    encode(version, payload);
+    encode(prefix, payload);
+    encode(incremental, payload);
+    encode(data, payload);
+  }
+};
index 0fd279d11f7e507c643c9c9a868961a570612e7a..5977e296c9567ffb70472927103d4eb0fad7bcee 100644 (file)
@@ -4,6 +4,7 @@
 #include "mon/Monitor.h"
 #include "mon/KVMonitor.h"
 #include "include/stringify.h"
+#include "messages/MKVData.h"
 
 #define dout_subsys ceph_subsys_mon
 #undef dout_prefix
@@ -55,7 +56,7 @@ void KVMonitor::update_from_paxos(bool *need_bootstrap)
   }
   version = get_last_committed();
   dout(10) << __func__ << " " << version << dendl;
-  //check_all_subs();
+  check_all_subs();
 }
 
 void KVMonitor::create_pending()
@@ -417,3 +418,106 @@ void KVMonitor::do_osd_new(
 
   pending[dmcrypt_key_prefix] = dmcrypt_key_value;
 }
+
+
+void KVMonitor::check_sub(MonSession *s)
+{
+  if (!s->authenticated) {
+    dout(20) << __func__ << " not authenticated " << s->entity_name << dendl;
+    return;
+  }
+  for (auto& p : s->sub_map) {
+    if (p.first.find("kv:") == 0) {
+      check_sub(p.second);
+    }
+  }
+}
+
+void KVMonitor::check_sub(Subscription *sub)
+{
+  dout(10) << __func__
+          << " next " << sub->next
+          << " have " << version << dendl;
+  if (sub->next <= version) {
+    maybe_send_update(sub);
+    if (sub->onetime) {
+      mon.with_session_map([sub](MonSessionMap& session_map) {
+         session_map.remove_sub(sub);
+       });
+    }
+  }
+}
+
+void KVMonitor::check_all_subs()
+{
+  dout(10) << __func__ << dendl;
+  int updated = 0, total = 0;
+  for (auto& i : mon.session_map.subs) {
+    if (i.first.find("kv:") == 0) {
+      auto p = i.second->begin();
+      while (!p.end()) {
+       auto sub = *p;
+       ++p;
+       ++total;
+       if (maybe_send_update(sub)) {
+         ++updated;
+       }
+      }
+    }
+  }
+  dout(10) << __func__ << " updated " << updated << " / " << total << dendl;
+}
+
+bool KVMonitor::maybe_send_update(Subscription *sub)
+{
+  if (sub->next > version) {
+    return false;
+  }
+
+  auto m = new MKVData;
+  m->prefix = sub->type.substr(3);
+  m->version = version;
+
+  if (sub->next && sub->next > get_first_committed()) {
+    // incremental
+    m->incremental = true;
+
+    for (version_t cur = sub->next; cur <= version; ++cur) {
+      bufferlist bl;
+      int err = get_version(cur, bl);
+      ceph_assert(err == 0);
+
+      std::map<std::string,boost::optional<ceph::buffer::list>> pending;
+      auto p = bl.cbegin();
+      ceph::decode(pending, p);
+
+      for (auto& i : pending) {
+       if (i.first.find(m->prefix) == 0) {
+         m->data[i.first] = i.second;
+       }
+      }
+    }
+
+    dout(10) << __func__ << " incremental keys for " << m->prefix
+            << ", v " << sub->next << ".." << version
+            << ", " << m->data.size() << " keys"
+            << dendl;
+  } else {
+    m->incremental = false;
+
+    KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX);
+    iter->lower_bound(m->prefix);
+    while (iter->valid() &&
+          iter->key().find(m->prefix) == 0) {
+      m->data[iter->key()] = iter->value();
+      iter->next();
+    }
+
+    dout(10) << __func__ << " sending full dump of " << m->prefix
+            << ", " << m->data.size() << " keys"
+            << dendl;
+  }
+  sub->session->con->send_message(m);
+  sub->next = version + 1;
+  return true;
+}
index 28c71f003c2ad74d9ed0f9c3e96755e66b98ebb2..8c0204390f53aad5034ffef853be39581243d52d 100644 (file)
@@ -49,4 +49,10 @@ public:
       const std::string& dmcrypt_key,
       std::stringstream& ss);
   void do_osd_new(const uuid_d& uuid, const std::string& dmcrypt_key);
+
+  void check_sub(MonSession *s);
+  void check_sub(Subscription *sub);
+  void check_all_subs();
+
+  bool maybe_send_update(Subscription *sub);
 };
index 651d86a3533ad1bf76a38626220dd8be7699ee15..5221f3c05ecdd1e4f5a47ad160b223d22d5cb73a 100644 (file)
@@ -5202,6 +5202,8 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       mgrstatmon()->check_sub(s->sub_map[p->first]);
     } else if (p->first == "config") {
       configmon()->check_sub(s);
+    } else if (p->first.find("kv:") == 0) {
+      kvmon()->check_sub(s->sub_map[p->first]);
     }
   }
 
index 24413e7889f6edc288e393d36008977daa2e5de7..6c57d355bdf49f95f90a2f624a3a705290c2cd6d 100644 (file)
@@ -36,6 +36,7 @@
 #include "messages/MMonPaxos.h"
 #include "messages/MConfig.h"
 #include "messages/MGetConfig.h"
+#include "messages/MKVData.h"
 
 #include "messages/MMonProbe.h"
 #include "messages/MMonJoin.h"
@@ -404,6 +405,9 @@ Message *decode_message(CephContext *cct,
   case MSG_GET_CONFIG:
     m = make_message<MGetConfig>();
     break;
+  case MSG_KV_DATA:
+    m = make_message<MKVData>();
+    break;
 
   case MSG_MON_PROBE:
     m = make_message<MMonProbe>();
index a36cc4dfbc277913bb7173b52aef97faec75b1e2..8e1c4f967f05942855a6f34e9f1135a8470eb190 100644 (file)
@@ -67,6 +67,8 @@
 #define MSG_CONFIG           62
 #define MSG_GET_CONFIG       63
 
+#define MSG_KV_DATA          54
+
 #define MSG_MON_GET_PURGED_SNAPS 76
 #define MSG_MON_GET_PURGED_SNAPS_REPLY 77