]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/LogMonitor: store logentries in separate keys
authorSage Weil <sage@newdream.net>
Wed, 23 Jun 2021 20:35:51 +0000 (16:35 -0400)
committerSage Weil <sage@newdream.net>
Sat, 3 Jul 2021 18:29:40 +0000 (14:29 -0400)
- Store each log entry in a separate key, separated by channel
- Only track the log entry version bounds for each channel and recent keys
  in the LogSummary
- keep way more history (since it is now cheap to do so)

Signed-off-by: Sage Weil <sage@newdream.net>
src/common/LogEntry.cc
src/common/LogEntry.h
src/common/options/mon.yaml.in
src/mon/LogMonitor.cc
src/mon/LogMonitor.h
src/vstart.sh

index b1ba2e63e75c2fe9f86847d735f18221330e40bf..9ca0ddfb1c22784da2c43e7b04157a3a58109cd6 100644 (file)
@@ -278,7 +278,7 @@ void LogEntry::generate_test_instances(list<LogEntry*>& o)
 
 // -----
 
-void LogSummary::build_ordered_tail(list<LogEntry> *tail) const
+void LogSummary::build_ordered_tail_legacy(list<LogEntry> *tail) const
 {
   tail->clear();
   // channel -> (begin, end)
@@ -313,31 +313,37 @@ void LogSummary::encode(bufferlist& bl, uint64_t features) const
     ENCODE_START(2, 2, bl);
     encode(version, bl);
     list<LogEntry> tail;
-    build_ordered_tail(&tail);
+    build_ordered_tail_legacy(&tail);
     encode(tail, bl, features);
     ENCODE_FINISH(bl);
     return;
   }
-  ENCODE_START(3, 3, bl);
+  ENCODE_START(4, 3, bl);
   encode(version, bl);
   encode(seq, bl);
   encode(tail_by_channel, bl, features);
+  encode(channel_info, bl);
+  recent_keys.encode(bl);
   ENCODE_FINISH(bl);
 }
 
 void LogSummary::decode(bufferlist::const_iterator& bl)
 {
-  DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+  DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
   decode(version, bl);
   if (struct_v < 3) {
     list<LogEntry> tail;
     decode(tail, bl);
     for (auto& i : tail) {
-      add(i);
+      add_legacy(i);
     }
   } else {
     decode(seq, bl);
     decode(tail_by_channel, bl);
+    if (struct_v >= 4) {
+      decode(channel_info, bl);
+      recent_keys.decode(bl);
+    }
   }
   DECODE_FINISH(bl);
   keys.clear();
index c6dd326510453831a686e2ccb93f6f417d1bdb00..5da6a3210552c148301ef3c8e9f9e44c7beb11d4 100644 (file)
@@ -19,6 +19,7 @@
 #include "msg/msg_types.h"
 #include "common/entity_name.h"
 #include "ostream_temp.h"
+#include "LRUSet.h"
 
 namespace ceph {
   class Formatter;
@@ -75,7 +76,22 @@ public:
   friend bool operator==(const LogEntryKey& l, const LogEntryKey& r) {
     return l.rank == r.rank && l.stamp == r.stamp && l.seq == r.seq;
   }
+
+  void encode(bufferlist& bl) const {
+    using ceph::encode;
+    encode(rank, bl);
+    encode(stamp, bl);
+    encode(seq, bl);
+  }
+  void decode(bufferlist::const_iterator &p) {
+    using ceph::decode;
+    decode(rank, p);
+    decode(stamp, p);
+    decode(seq, p);
+  }
 };
+WRITE_CLASS_ENCODER(LogEntryKey)
+
 
 namespace std {
 template<> struct hash<LogEntryKey> {
@@ -111,16 +127,22 @@ WRITE_CLASS_ENCODER_FEATURES(LogEntry)
 
 struct LogSummary {
   version_t version;
+
+  // ---- pre-quincy ----
   // channel -> [(seq#, entry), ...]
   std::map<std::string,std::list<std::pair<uint64_t,LogEntry>>> tail_by_channel;
   uint64_t seq = 0;
   ceph::unordered_set<LogEntryKey> keys;
 
+  // ---- quincy+ ----
+  LRUSet<LogEntryKey> recent_keys;
+  std::map<std::string, pair<uint64_t,uint64_t>> channel_info; // channel -> [begin, end)
+
   LogSummary() : version(0) {}
 
-  void build_ordered_tail(std::list<LogEntry> *tail) const;
+  void build_ordered_tail_legacy(std::list<LogEntry> *tail) const;
 
-  void add(const LogEntry& e) {
+  void add_legacy(const LogEntry& e) {
     keys.insert(e.key());
     tail_by_channel[e.channel].push_back(std::make_pair(++seq, e));
   }
@@ -131,9 +153,10 @@ struct LogSummary {
        i.second.pop_front();
       }
     }
+    recent_keys.prune(max);
   }
   bool contains(const LogEntryKey& k) const {
-    return keys.count(k);
+    return keys.count(k) || recent_keys.contains(k);
   }
 
   void encode(ceph::buffer::list& bl, uint64_t features) const;
index 535dc65e26a2e0a807f3c81e43b45524f85ec78f..6e80fc4f147b6ce7256b9a80f92dcff2b1936b85 100644 (file)
@@ -201,14 +201,29 @@ options:
   - mon
   flags:
   - runtime
-- name: mon_log_max_summary
+- name: mon_log_max
   type: uint
   level: advanced
   desc: number of recent cluster log messages to retain
+  default: 10000
+  services:
+  - mon
+  with_legacy: true
+- name: mon_log_max_summary
+  type: uint
+  level: advanced
+  desc: number of recent cluster log messages to dedup against
   default: 50
   services:
   - mon
   with_legacy: true
+- name: mon_log_full_interval
+  type: uint
+  level: advanced
+  desc: how many epochs before we encode a full copy of recent log keys
+  default: 50
+  services: [mon]
+  with_legacy: true
 - name: mon_max_log_entries_per_event
   type: int
   level: advanced
index 4325e54d9d98ebe213f49430dbc9b8bbc0f6e7e2..fadb6e7a5decf43656b031456f5cbda615fe909e 100644 (file)
  * 
  */
 
+
+/*
+
+  -- Storage scheme --
+
+  Pre-quincy:
+
+    - LogSummary contains last N entries for every channel
+    - LogSummary (as "full") written on every commit
+    - LogSummary contains "keys" which LogEntryKey hash_set for the
+      same set of entries (for deduping)
+
+  Quincy+:
+
+    - LogSummary contains, for each channel,
+       - start seq
+       - end seq (last written seq + 1)
+    - LogSummary contains an LRUSet for tracking dups
+    - LogSummary written every N commits
+    - each LogEntry written in a separate key
+       - "%s/%08x" % (channel, seq) -> LogEntry
+    - per-commit record includes channel -> begin (trim bounds)
+    - 'external_log_to' meta records version to which we have logged externally
+
+*/
+
+
+
 #include <boost/algorithm/string/predicate.hpp>
 
 #include <sstream>
@@ -269,21 +297,45 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
     ceph_assert(bl.length());
 
     auto p = bl.cbegin();
-    __u8 v;
-    decode(v, p);
-    while (!p.end()) {
-      LogEntry le;
-      le.decode(p);
-      dout(7) << "update_from_paxos applying incremental log "
-             << summary.version+1 <<  " " << le << dendl;
-
-      log_external(le);
-      summary.add(le);
+    __u8 struct_v;
+    decode(struct_v, p);
+    if (struct_v == 1) {
+      // legacy pre-quincy commits
+      while (!p.end()) {
+       LogEntry le;
+       le.decode(p);
+       dout(7) << "update_from_paxos applying incremental log "
+               << summary.version+1 <<  " " << le << dendl;
+       summary.add_legacy(le);
+      }
+    } else {
+      uint32_t num;
+      decode(num, p);
+      while (num--) {
+       LogEntry le;
+       le.decode(p);
+       dout(7) << "update_from_paxos applying incremental log "
+               << summary.version+1 <<  " " << le << dendl;
+       summary.recent_keys.insert(le.key());
+       summary.channel_info[le.channel].second++;
+       // we may have logged past the (persisted) summary in a prior quorum
+       if (version > external_log_to) {
+         log_external(le);
+       }
+      }
+      map<string,version_t> prune_channels_to;
+      decode(prune_channels_to, p);
+      for (auto& [channel, prune_to] : prune_channels_to) {
+       dout(20) << __func__ << " channel " << channel
+                << " pruned to " << prune_to << dendl;
+       summary.channel_info[channel].first = prune_to;
+      }
     }
 
     summary.version++;
     summary.prune(g_conf()->mon_log_max_summary);
   }
+  dout(10) << " summary.channel_info " << summary.channel_info << dendl;
   external_log_to = version;
   mon.store->write_meta("external_log_to", stringify(external_log_to));
 
@@ -413,11 +465,14 @@ void LogMonitor::log_external_backlog()
     int err = get_version(external_log_to+1, bl);
     ceph_assert(err == 0);
     ceph_assert(bl.length());
-
     auto p = bl.cbegin();
     __u8 v;
     decode(v, p);
-    while (!p.end()) {
+    int32_t num = -2;
+    if (v >= 2) {
+      decode(num, p);
+    }
+    while ((num == -2 && !p.end()) || num--) {
       LogEntry le;
       le.decode(p);
       log_external(le);
@@ -429,8 +484,20 @@ void LogMonitor::log_external_backlog()
 void LogMonitor::create_pending()
 {
   pending_log.clear();
-  dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
   pending_keys.clear();
+  dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
+}
+
+void LogMonitor::generate_logentry_key(
+  const std::string& channel,
+  version_t v,
+  std::string *out)
+{
+  out->append(channel);
+  out->append("/");
+  char vs[10];
+  snprintf(vs, sizeof(vs), "%08llx", (unsigned long long)v);
+  out->append(vs);
 }
 
 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
@@ -438,15 +505,79 @@ void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   version_t version = get_last_committed() + 1;
   bufferlist bl;
   dout(10) << __func__ << " v" << version << dendl;
-  __u8 v = 1;
-  encode(v, bl);
-  for (auto p = pending_log.begin(); p != pending_log.end(); ++p)
-    p->second.encode(bl, mon.get_quorum_con_features());
+
+  if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
+    // legacy encoding for pre-quincy quorum
+    __u8 struct_v = 1;
+    encode(struct_v, bl);
+    for (auto& p : pending_log) {
+      p.second.encode(bl, mon.get_quorum_con_features());
+    }
+    put_version(t, version, bl);
+    put_last_committed(t, version);
+    return;
+  }
+  
+  __u8 struct_v = 2;
+  encode(struct_v, bl);
+
+  // record new entries
+  auto pending_channel_info = summary.channel_info;
+  uint32_t num = pending_log.size();
+  encode(num, bl);
+  for (auto& p : pending_log) {
+    bufferlist ebl;
+    p.second.encode(ebl, mon.get_quorum_con_features());
+
+    auto& bounds = pending_channel_info[p.second.channel];
+    version_t v = bounds.second++;
+    std::string key;
+    generate_logentry_key(p.second.channel, v, &key);
+    t->put(get_service_name(), key, ebl);
+    
+    bl.claim_append(ebl);
+  }
+
+  // prune log entries?
+  map<string,version_t> prune_channels_to;
+  for (auto& [channel, info] : summary.channel_info) {
+    if (info.second - info.first > g_conf()->mon_log_max) {
+      const version_t from = info.first;
+      const version_t to = info.second - g_conf()->mon_log_max;
+      dout(10) << __func__ << " pruning channel " << channel
+              << " " << from << " -> " << to << dendl;
+      prune_channels_to[channel] = to;
+      pending_channel_info[channel].first = to;
+      for (version_t v = from; v < to; ++v) {
+       std::string key;
+       generate_logentry_key(channel, v, &key);
+       t->erase(get_service_name(), key);
+      }
+    }
+  }
+  dout(20) << __func__ << " prune_channels_to " << prune_channels_to << dendl;
+  encode(prune_channels_to, bl);
 
   put_version(t, version, bl);
   put_last_committed(t, version);
 }
 
+bool LogMonitor::should_stash_full()
+{
+  if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
+    // commit a LogSummary on every commit
+    return true;
+  }
+
+  // store periodic summary
+  auto period = std::min<uint64_t>(
+    g_conf()->mon_log_full_interval,
+    g_conf()->mon_max_log_epochs
+    );
+  return (get_last_committed() - get_version_latest_full() > period);
+}
+
+
 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
 {
   dout(10) << __func__ << " log v " << summary.version << dendl;
@@ -669,7 +800,7 @@ bool LogMonitor::preprocess_command(MonOpRequestRef op)
     ostringstream ss;
     if (channel == "*") {
       list<LogEntry> full_tail;
-      summary.build_ordered_tail(&full_tail);
+      summary.build_ordered_tail_legacy(&full_tail);
       auto rp = full_tail.rbegin();
       for (; num > 0 && rp != full_tail.rend(); ++rp) {
        if (match(*rp)) {
@@ -907,17 +1038,19 @@ void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
     ceph_assert(bl.length());
     auto p = bl.cbegin();
     __u8 v;
-    decode(v,p);
-    while (!p.end()) {
+    decode(v, p);
+    int32_t num = -2;
+    if (v >= 2) {
+      decode(num, p);
+    }
+    while ((num == -2 && !p.end()) || num--) {
       LogEntry le;
       le.decode(p);
-
       if (le.prio < level) {
        dout(20) << __func__ << " requested " << level 
                 << " entry " << le.prio << dendl;
        continue;
       }
-
       mlog->entries.push_back(le);
     }
     mlog->version = sv++;
index 1605e67af4bc84ff99e06bf7a1aeb6da5b494709..715f380af6a4ed407ef0b5b8fafa0685fb6423c9 100644 (file)
@@ -44,7 +44,9 @@ class LogMonitor : public PaxosService,
 private:
   std::multimap<utime_t,LogEntry> pending_log;
   unordered_set<LogEntryKey> pending_keys;
+
   LogSummary summary;
+
   version_t external_log_to = 0;
   std::map<std::string, int> channel_fds;
 
@@ -128,6 +130,7 @@ private:
   void update_from_paxos(bool *need_bootstrap) override;
   void create_pending() override;  // prepare a new pending
   // propose pending update to peers
+  void generate_logentry_key(const std::string& channel, version_t v, std::string *out);
   void encode_pending(MonitorDBStore::TransactionRef t) override;
   void encode_full(MonitorDBStore::TransactionRef t) override;
   version_t get_trim_to() const override;
@@ -140,10 +143,7 @@ private:
 
   bool should_propose(double& delay) override;
 
-  bool should_stash_full() override {
-    // commit a LogSummary on every commit
-    return true;
-  }
+  bool should_stash_full() override;
 
   struct C_Log;
 
index 0d71f421639504b1d2ef609e5466cab46f1caa64..3a3f5ddfbd2561db8311ce1be46a50a72357ee06 100755 (executable)
@@ -1072,7 +1072,7 @@ EOF
         debug echo Enabling cephadm orchestrator
        if [ "$new" -eq 1 ]; then
                digest=$(curl -s \
-               https://registry.hub.docker.com/v2/repositories/ceph/daemon-base/tags/latest-master-devel \
+               https://hub.docker.com/v2/repositories/ceph/daemon-base/tags/latest-master-devel \
                | jq -r '.images[0].digest')
                ceph_adm config set global container_image "docker.io/ceph/daemon-base@$digest"
        fi