From: Sage Weil Date: Wed, 23 Jun 2021 20:35:51 +0000 (-0400) Subject: mon/LogMonitor: store logentries in separate keys X-Git-Tag: v17.1.0~1423^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=52a53366752f9ba5cf76292d0cf5760b11be9ea1;p=ceph.git mon/LogMonitor: store logentries in separate keys - Store each log entry in a separate key, separated by channel - Only track the log entry version bounds for each channel and recent keys in the LogSummary - keep way more history (since it is now cheap to do so) Signed-off-by: Sage Weil --- diff --git a/src/common/LogEntry.cc b/src/common/LogEntry.cc index b1ba2e63e75c..9ca0ddfb1c22 100644 --- a/src/common/LogEntry.cc +++ b/src/common/LogEntry.cc @@ -278,7 +278,7 @@ void LogEntry::generate_test_instances(list& o) // ----- -void LogSummary::build_ordered_tail(list *tail) const +void LogSummary::build_ordered_tail_legacy(list *tail) const { tail->clear(); // channel -> (begin, end) @@ -313,31 +313,37 @@ void LogSummary::encode(bufferlist& bl, uint64_t features) const ENCODE_START(2, 2, bl); encode(version, bl); list tail; - build_ordered_tail(&tail); + build_ordered_tail_legacy(&tail); encode(tail, bl, features); ENCODE_FINISH(bl); return; } - ENCODE_START(3, 3, bl); + ENCODE_START(4, 3, bl); encode(version, bl); encode(seq, bl); encode(tail_by_channel, bl, features); + encode(channel_info, bl); + recent_keys.encode(bl); ENCODE_FINISH(bl); } void LogSummary::decode(bufferlist::const_iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl); + DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl); decode(version, bl); if (struct_v < 3) { list tail; decode(tail, bl); for (auto& i : tail) { - add(i); + add_legacy(i); } } else { decode(seq, bl); decode(tail_by_channel, bl); + if (struct_v >= 4) { + decode(channel_info, bl); + recent_keys.decode(bl); + } } DECODE_FINISH(bl); keys.clear(); diff --git a/src/common/LogEntry.h b/src/common/LogEntry.h index c6dd32651045..5da6a3210552 100644 --- a/src/common/LogEntry.h +++ b/src/common/LogEntry.h @@ -19,6 +19,7 @@ #include "msg/msg_types.h" #include "common/entity_name.h" #include "ostream_temp.h" +#include "LRUSet.h" namespace ceph { class Formatter; @@ -75,7 +76,22 @@ public: friend bool operator==(const LogEntryKey& l, const LogEntryKey& r) { return l.rank == r.rank && l.stamp == r.stamp && l.seq == r.seq; } + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(rank, bl); + encode(stamp, bl); + encode(seq, bl); + } + void decode(bufferlist::const_iterator &p) { + using ceph::decode; + decode(rank, p); + decode(stamp, p); + decode(seq, p); + } }; +WRITE_CLASS_ENCODER(LogEntryKey) + namespace std { template<> struct hash { @@ -111,16 +127,22 @@ WRITE_CLASS_ENCODER_FEATURES(LogEntry) struct LogSummary { version_t version; + + // ---- pre-quincy ---- // channel -> [(seq#, entry), ...] std::map>> tail_by_channel; uint64_t seq = 0; ceph::unordered_set keys; + // ---- quincy+ ---- + LRUSet recent_keys; + std::map> channel_info; // channel -> [begin, end) + LogSummary() : version(0) {} - void build_ordered_tail(std::list *tail) const; + void build_ordered_tail_legacy(std::list *tail) const; - void add(const LogEntry& e) { + void add_legacy(const LogEntry& e) { keys.insert(e.key()); tail_by_channel[e.channel].push_back(std::make_pair(++seq, e)); } @@ -131,9 +153,10 @@ struct LogSummary { i.second.pop_front(); } } + recent_keys.prune(max); } bool contains(const LogEntryKey& k) const { - return keys.count(k); + return keys.count(k) || recent_keys.contains(k); } void encode(ceph::buffer::list& bl, uint64_t features) const; diff --git a/src/common/options/mon.yaml.in b/src/common/options/mon.yaml.in index 535dc65e26a2..6e80fc4f147b 100644 --- a/src/common/options/mon.yaml.in +++ b/src/common/options/mon.yaml.in @@ -201,14 +201,29 @@ options: - mon flags: - runtime -- name: mon_log_max_summary +- name: mon_log_max type: uint level: advanced desc: number of recent cluster log messages to retain + default: 10000 + services: + - mon + with_legacy: true +- name: mon_log_max_summary + type: uint + level: advanced + desc: number of recent cluster log messages to dedup against default: 50 services: - mon with_legacy: true +- name: mon_log_full_interval + type: uint + level: advanced + desc: how many epochs before we encode a full copy of recent log keys + default: 50 + services: [mon] + with_legacy: true - name: mon_max_log_entries_per_event type: int level: advanced diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc index 4325e54d9d98..fadb6e7a5dec 100644 --- a/src/mon/LogMonitor.cc +++ b/src/mon/LogMonitor.cc @@ -12,6 +12,34 @@ * */ + +/* + + -- Storage scheme -- + + Pre-quincy: + + - LogSummary contains last N entries for every channel + - LogSummary (as "full") written on every commit + - LogSummary contains "keys" which LogEntryKey hash_set for the + same set of entries (for deduping) + + Quincy+: + + - LogSummary contains, for each channel, + - start seq + - end seq (last written seq + 1) + - LogSummary contains an LRUSet for tracking dups + - LogSummary written every N commits + - each LogEntry written in a separate key + - "%s/%08x" % (channel, seq) -> LogEntry + - per-commit record includes channel -> begin (trim bounds) + - 'external_log_to' meta records version to which we have logged externally + +*/ + + + #include #include @@ -269,21 +297,45 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap) ceph_assert(bl.length()); auto p = bl.cbegin(); - __u8 v; - decode(v, p); - while (!p.end()) { - LogEntry le; - le.decode(p); - dout(7) << "update_from_paxos applying incremental log " - << summary.version+1 << " " << le << dendl; - - log_external(le); - summary.add(le); + __u8 struct_v; + decode(struct_v, p); + if (struct_v == 1) { + // legacy pre-quincy commits + while (!p.end()) { + LogEntry le; + le.decode(p); + dout(7) << "update_from_paxos applying incremental log " + << summary.version+1 << " " << le << dendl; + summary.add_legacy(le); + } + } else { + uint32_t num; + decode(num, p); + while (num--) { + LogEntry le; + le.decode(p); + dout(7) << "update_from_paxos applying incremental log " + << summary.version+1 << " " << le << dendl; + summary.recent_keys.insert(le.key()); + summary.channel_info[le.channel].second++; + // we may have logged past the (persisted) summary in a prior quorum + if (version > external_log_to) { + log_external(le); + } + } + map prune_channels_to; + decode(prune_channels_to, p); + for (auto& [channel, prune_to] : prune_channels_to) { + dout(20) << __func__ << " channel " << channel + << " pruned to " << prune_to << dendl; + summary.channel_info[channel].first = prune_to; + } } summary.version++; summary.prune(g_conf()->mon_log_max_summary); } + dout(10) << " summary.channel_info " << summary.channel_info << dendl; external_log_to = version; mon.store->write_meta("external_log_to", stringify(external_log_to)); @@ -413,11 +465,14 @@ void LogMonitor::log_external_backlog() int err = get_version(external_log_to+1, bl); ceph_assert(err == 0); ceph_assert(bl.length()); - auto p = bl.cbegin(); __u8 v; decode(v, p); - while (!p.end()) { + int32_t num = -2; + if (v >= 2) { + decode(num, p); + } + while ((num == -2 && !p.end()) || num--) { LogEntry le; le.decode(p); log_external(le); @@ -429,8 +484,20 @@ void LogMonitor::log_external_backlog() void LogMonitor::create_pending() { pending_log.clear(); - dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl; pending_keys.clear(); + dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl; +} + +void LogMonitor::generate_logentry_key( + const std::string& channel, + version_t v, + std::string *out) +{ + out->append(channel); + out->append("/"); + char vs[10]; + snprintf(vs, sizeof(vs), "%08llx", (unsigned long long)v); + out->append(vs); } void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t) @@ -438,15 +505,79 @@ void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t) version_t version = get_last_committed() + 1; bufferlist bl; dout(10) << __func__ << " v" << version << dendl; - __u8 v = 1; - encode(v, bl); - for (auto p = pending_log.begin(); p != pending_log.end(); ++p) - p->second.encode(bl, mon.get_quorum_con_features()); + + if (mon.monmap->min_mon_release < ceph_release_t::quincy) { + // legacy encoding for pre-quincy quorum + __u8 struct_v = 1; + encode(struct_v, bl); + for (auto& p : pending_log) { + p.second.encode(bl, mon.get_quorum_con_features()); + } + put_version(t, version, bl); + put_last_committed(t, version); + return; + } + + __u8 struct_v = 2; + encode(struct_v, bl); + + // record new entries + auto pending_channel_info = summary.channel_info; + uint32_t num = pending_log.size(); + encode(num, bl); + for (auto& p : pending_log) { + bufferlist ebl; + p.second.encode(ebl, mon.get_quorum_con_features()); + + auto& bounds = pending_channel_info[p.second.channel]; + version_t v = bounds.second++; + std::string key; + generate_logentry_key(p.second.channel, v, &key); + t->put(get_service_name(), key, ebl); + + bl.claim_append(ebl); + } + + // prune log entries? + map prune_channels_to; + for (auto& [channel, info] : summary.channel_info) { + if (info.second - info.first > g_conf()->mon_log_max) { + const version_t from = info.first; + const version_t to = info.second - g_conf()->mon_log_max; + dout(10) << __func__ << " pruning channel " << channel + << " " << from << " -> " << to << dendl; + prune_channels_to[channel] = to; + pending_channel_info[channel].first = to; + for (version_t v = from; v < to; ++v) { + std::string key; + generate_logentry_key(channel, v, &key); + t->erase(get_service_name(), key); + } + } + } + dout(20) << __func__ << " prune_channels_to " << prune_channels_to << dendl; + encode(prune_channels_to, bl); put_version(t, version, bl); put_last_committed(t, version); } +bool LogMonitor::should_stash_full() +{ + if (mon.monmap->min_mon_release < ceph_release_t::quincy) { + // commit a LogSummary on every commit + return true; + } + + // store periodic summary + auto period = std::min( + g_conf()->mon_log_full_interval, + g_conf()->mon_max_log_epochs + ); + return (get_last_committed() - get_version_latest_full() > period); +} + + void LogMonitor::encode_full(MonitorDBStore::TransactionRef t) { dout(10) << __func__ << " log v " << summary.version << dendl; @@ -669,7 +800,7 @@ bool LogMonitor::preprocess_command(MonOpRequestRef op) ostringstream ss; if (channel == "*") { list full_tail; - summary.build_ordered_tail(&full_tail); + summary.build_ordered_tail_legacy(&full_tail); auto rp = full_tail.rbegin(); for (; num > 0 && rp != full_tail.rend(); ++rp) { if (match(*rp)) { @@ -907,17 +1038,19 @@ void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv) ceph_assert(bl.length()); auto p = bl.cbegin(); __u8 v; - decode(v,p); - while (!p.end()) { + decode(v, p); + int32_t num = -2; + if (v >= 2) { + decode(num, p); + } + while ((num == -2 && !p.end()) || num--) { LogEntry le; le.decode(p); - if (le.prio < level) { dout(20) << __func__ << " requested " << level << " entry " << le.prio << dendl; continue; } - mlog->entries.push_back(le); } mlog->version = sv++; diff --git a/src/mon/LogMonitor.h b/src/mon/LogMonitor.h index 1605e67af4bc..715f380af6a4 100644 --- a/src/mon/LogMonitor.h +++ b/src/mon/LogMonitor.h @@ -44,7 +44,9 @@ class LogMonitor : public PaxosService, private: std::multimap pending_log; unordered_set pending_keys; + LogSummary summary; + version_t external_log_to = 0; std::map channel_fds; @@ -128,6 +130,7 @@ private: void update_from_paxos(bool *need_bootstrap) override; void create_pending() override; // prepare a new pending // propose pending update to peers + void generate_logentry_key(const std::string& channel, version_t v, std::string *out); void encode_pending(MonitorDBStore::TransactionRef t) override; void encode_full(MonitorDBStore::TransactionRef t) override; version_t get_trim_to() const override; @@ -140,10 +143,7 @@ private: bool should_propose(double& delay) override; - bool should_stash_full() override { - // commit a LogSummary on every commit - return true; - } + bool should_stash_full() override; struct C_Log; diff --git a/src/vstart.sh b/src/vstart.sh index 0d71f4216395..3a3f5ddfbd25 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -1072,7 +1072,7 @@ EOF debug echo Enabling cephadm orchestrator if [ "$new" -eq 1 ]; then digest=$(curl -s \ - https://registry.hub.docker.com/v2/repositories/ceph/daemon-base/tags/latest-master-devel \ + https://hub.docker.com/v2/repositories/ceph/daemon-base/tags/latest-master-devel \ | jq -r '.images[0].digest') ceph_adm config set global container_image "docker.io/ceph/daemon-base@$digest" fi