// -----
-void LogSummary::build_ordered_tail(list<LogEntry> *tail) const
+void LogSummary::build_ordered_tail_legacy(list<LogEntry> *tail) const
{
tail->clear();
// channel -> (begin, end)
ENCODE_START(2, 2, bl);
encode(version, bl);
list<LogEntry> tail;
- build_ordered_tail(&tail);
+ build_ordered_tail_legacy(&tail);
encode(tail, bl, features);
ENCODE_FINISH(bl);
return;
}
- ENCODE_START(3, 3, bl);
+ ENCODE_START(4, 3, bl);
encode(version, bl);
encode(seq, bl);
encode(tail_by_channel, bl, features);
+ encode(channel_info, bl);
+ recent_keys.encode(bl);
ENCODE_FINISH(bl);
}
void LogSummary::decode(bufferlist::const_iterator& bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
decode(version, bl);
if (struct_v < 3) {
list<LogEntry> tail;
decode(tail, bl);
for (auto& i : tail) {
- add(i);
+ add_legacy(i);
}
} else {
decode(seq, bl);
decode(tail_by_channel, bl);
+ if (struct_v >= 4) {
+ decode(channel_info, bl);
+ recent_keys.decode(bl);
+ }
}
DECODE_FINISH(bl);
keys.clear();
#include "msg/msg_types.h"
#include "common/entity_name.h"
#include "ostream_temp.h"
+#include "LRUSet.h"
namespace ceph {
class Formatter;
friend bool operator==(const LogEntryKey& l, const LogEntryKey& r) {
return l.rank == r.rank && l.stamp == r.stamp && l.seq == r.seq;
}
+
+ void encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(rank, bl);
+ encode(stamp, bl);
+ encode(seq, bl);
+ }
+ void decode(bufferlist::const_iterator &p) {
+ using ceph::decode;
+ decode(rank, p);
+ decode(stamp, p);
+ decode(seq, p);
+ }
};
+WRITE_CLASS_ENCODER(LogEntryKey)
+
namespace std {
template<> struct hash<LogEntryKey> {
struct LogSummary {
version_t version;
+
+ // ---- pre-quincy ----
// channel -> [(seq#, entry), ...]
std::map<std::string,std::list<std::pair<uint64_t,LogEntry>>> tail_by_channel;
uint64_t seq = 0;
ceph::unordered_set<LogEntryKey> keys;
+ // ---- quincy+ ----
+ LRUSet<LogEntryKey> recent_keys;
+ std::map<std::string, pair<uint64_t,uint64_t>> channel_info; // channel -> [begin, end)
+
LogSummary() : version(0) {}
- void build_ordered_tail(std::list<LogEntry> *tail) const;
+ void build_ordered_tail_legacy(std::list<LogEntry> *tail) const;
- void add(const LogEntry& e) {
+ void add_legacy(const LogEntry& e) {
keys.insert(e.key());
tail_by_channel[e.channel].push_back(std::make_pair(++seq, e));
}
i.second.pop_front();
}
}
+ recent_keys.prune(max);
}
bool contains(const LogEntryKey& k) const {
- return keys.count(k);
+ return keys.count(k) || recent_keys.contains(k);
}
void encode(ceph::buffer::list& bl, uint64_t features) const;
*
*/
+
+/*
+
+ -- Storage scheme --
+
+ Pre-quincy:
+
+ - LogSummary contains last N entries for every channel
+ - LogSummary (as "full") written on every commit
+ - LogSummary contains "keys" which LogEntryKey hash_set for the
+ same set of entries (for deduping)
+
+ Quincy+:
+
+ - LogSummary contains, for each channel,
+ - start seq
+ - end seq (last written seq + 1)
+ - LogSummary contains an LRUSet for tracking dups
+ - LogSummary written every N commits
+ - each LogEntry written in a separate key
+ - "%s/%08x" % (channel, seq) -> LogEntry
+ - per-commit record includes channel -> begin (trim bounds)
+ - 'external_log_to' meta records version to which we have logged externally
+
+*/
+
+
+
#include <boost/algorithm/string/predicate.hpp>
#include <sstream>
ceph_assert(bl.length());
auto p = bl.cbegin();
- __u8 v;
- decode(v, p);
- while (!p.end()) {
- LogEntry le;
- le.decode(p);
- dout(7) << "update_from_paxos applying incremental log "
- << summary.version+1 << " " << le << dendl;
-
- log_external(le);
- summary.add(le);
+ __u8 struct_v;
+ decode(struct_v, p);
+ if (struct_v == 1) {
+ // legacy pre-quincy commits
+ while (!p.end()) {
+ LogEntry le;
+ le.decode(p);
+ dout(7) << "update_from_paxos applying incremental log "
+ << summary.version+1 << " " << le << dendl;
+ summary.add_legacy(le);
+ }
+ } else {
+ uint32_t num;
+ decode(num, p);
+ while (num--) {
+ LogEntry le;
+ le.decode(p);
+ dout(7) << "update_from_paxos applying incremental log "
+ << summary.version+1 << " " << le << dendl;
+ summary.recent_keys.insert(le.key());
+ summary.channel_info[le.channel].second++;
+ // we may have logged past the (persisted) summary in a prior quorum
+ if (version > external_log_to) {
+ log_external(le);
+ }
+ }
+ map<string,version_t> prune_channels_to;
+ decode(prune_channels_to, p);
+ for (auto& [channel, prune_to] : prune_channels_to) {
+ dout(20) << __func__ << " channel " << channel
+ << " pruned to " << prune_to << dendl;
+ summary.channel_info[channel].first = prune_to;
+ }
}
summary.version++;
summary.prune(g_conf()->mon_log_max_summary);
}
+ dout(10) << " summary.channel_info " << summary.channel_info << dendl;
external_log_to = version;
mon.store->write_meta("external_log_to", stringify(external_log_to));
int err = get_version(external_log_to+1, bl);
ceph_assert(err == 0);
ceph_assert(bl.length());
-
auto p = bl.cbegin();
__u8 v;
decode(v, p);
- while (!p.end()) {
+ int32_t num = -2;
+ if (v >= 2) {
+ decode(num, p);
+ }
+ while ((num == -2 && !p.end()) || num--) {
LogEntry le;
le.decode(p);
log_external(le);
void LogMonitor::create_pending()
{
pending_log.clear();
- dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
pending_keys.clear();
+ dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
+}
+
+void LogMonitor::generate_logentry_key(
+ const std::string& channel,
+ version_t v,
+ std::string *out)
+{
+ out->append(channel);
+ out->append("/");
+ char vs[10];
+ snprintf(vs, sizeof(vs), "%08llx", (unsigned long long)v);
+ out->append(vs);
}
void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
version_t version = get_last_committed() + 1;
bufferlist bl;
dout(10) << __func__ << " v" << version << dendl;
- __u8 v = 1;
- encode(v, bl);
- for (auto p = pending_log.begin(); p != pending_log.end(); ++p)
- p->second.encode(bl, mon.get_quorum_con_features());
+
+ if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
+ // legacy encoding for pre-quincy quorum
+ __u8 struct_v = 1;
+ encode(struct_v, bl);
+ for (auto& p : pending_log) {
+ p.second.encode(bl, mon.get_quorum_con_features());
+ }
+ put_version(t, version, bl);
+ put_last_committed(t, version);
+ return;
+ }
+
+ __u8 struct_v = 2;
+ encode(struct_v, bl);
+
+ // record new entries
+ auto pending_channel_info = summary.channel_info;
+ uint32_t num = pending_log.size();
+ encode(num, bl);
+ for (auto& p : pending_log) {
+ bufferlist ebl;
+ p.second.encode(ebl, mon.get_quorum_con_features());
+
+ auto& bounds = pending_channel_info[p.second.channel];
+ version_t v = bounds.second++;
+ std::string key;
+ generate_logentry_key(p.second.channel, v, &key);
+ t->put(get_service_name(), key, ebl);
+
+ bl.claim_append(ebl);
+ }
+
+ // prune log entries?
+ map<string,version_t> prune_channels_to;
+ for (auto& [channel, info] : summary.channel_info) {
+ if (info.second - info.first > g_conf()->mon_log_max) {
+ const version_t from = info.first;
+ const version_t to = info.second - g_conf()->mon_log_max;
+ dout(10) << __func__ << " pruning channel " << channel
+ << " " << from << " -> " << to << dendl;
+ prune_channels_to[channel] = to;
+ pending_channel_info[channel].first = to;
+ for (version_t v = from; v < to; ++v) {
+ std::string key;
+ generate_logentry_key(channel, v, &key);
+ t->erase(get_service_name(), key);
+ }
+ }
+ }
+ dout(20) << __func__ << " prune_channels_to " << prune_channels_to << dendl;
+ encode(prune_channels_to, bl);
put_version(t, version, bl);
put_last_committed(t, version);
}
+bool LogMonitor::should_stash_full()
+{
+ if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
+ // commit a LogSummary on every commit
+ return true;
+ }
+
+ // store periodic summary
+ auto period = std::min<uint64_t>(
+ g_conf()->mon_log_full_interval,
+ g_conf()->mon_max_log_epochs
+ );
+ return (get_last_committed() - get_version_latest_full() > period);
+}
+
+
void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
{
dout(10) << __func__ << " log v " << summary.version << dendl;
ostringstream ss;
if (channel == "*") {
list<LogEntry> full_tail;
- summary.build_ordered_tail(&full_tail);
+ summary.build_ordered_tail_legacy(&full_tail);
auto rp = full_tail.rbegin();
for (; num > 0 && rp != full_tail.rend(); ++rp) {
if (match(*rp)) {
ceph_assert(bl.length());
auto p = bl.cbegin();
__u8 v;
- decode(v,p);
- while (!p.end()) {
+ decode(v, p);
+ int32_t num = -2;
+ if (v >= 2) {
+ decode(num, p);
+ }
+ while ((num == -2 && !p.end()) || num--) {
LogEntry le;
le.decode(p);
-
if (le.prio < level) {
dout(20) << __func__ << " requested " << level
<< " entry " << le.prio << dendl;
continue;
}
-
mlog->entries.push_back(le);
}
mlog->version = sv++;