LOG_ERROR = 4,
} log_type;
+struct LogEntryKey {
+ entity_inst_t who;
+ utime_t stamp;
+ __u64 seq;
+
+ LogEntryKey() {}
+ LogEntryKey(entity_inst_t w, utime_t t, __u64 s) : who(w), stamp(t), seq(s) {}
+
+ void encode(bufferlist& bl) const {
+ ::encode(who, bl);
+ ::encode(stamp, bl);
+ ::encode(seq, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(who, bl);
+ ::decode(stamp, bl);
+ ::decode(seq, bl);
+ }
+};
+WRITE_CLASS_ENCODER(LogEntryKey)
+
+static inline bool operator==(const LogEntryKey& l, const LogEntryKey& r) {
+ return l.who == r.who && l.stamp == r.stamp && l.seq == r.seq;
+}
+
struct LogEntry {
entity_inst_t who;
utime_t stamp;
log_type type;
string msg;
+ LogEntryKey key() const { return LogEntryKey(who, stamp, seq); }
+
void encode(bufferlist& bl) const {
__u16 t = type;
::encode(who, bl);
};
WRITE_CLASS_ENCODER(LogEntry)
+struct LogSummary {
+ version_t version;
+ list<LogEntry> tail;
+
+ LogSummary() : version(0) {}
+
+ void add(const LogEntry& e) {
+ tail.push_back(e);
+ while (tail.size() > 50)
+ tail.pop_front();
+ }
+ bool contains(LogEntryKey k) const {
+ for (list<LogEntry>::const_iterator p = tail.begin();
+ p != tail.end();
+ p++)
+ if (p->key() == k) return true;
+ return false;
+ }
+
+ void encode(bufferlist& bl) const {
+ ::encode(version, bl);
+ ::encode(tail, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(version, bl);
+ ::decode(tail, bl);
+ }
+};
+WRITE_CLASS_ENCODER(LogSummary)
+
inline ostream& operator<<(ostream& out, const log_type& t)
{
switch (t) {
{
version_t paxosv = paxos->get_version();
- if (paxosv == log_version) return true;
- assert(paxosv >= log_version);
-
- if (log_version == 0 && paxosv > 1) {
- log_version = mon->store->get_int("logm", "last_consumed");
- }
+ if (paxosv == summary.version) return true;
+ assert(paxosv >= summary.version);
bufferlist blog;
bufferlist blogdebug;
bufferlist blogerr;
bufferlist blogsec;
+ if (summary.version == 0 && paxosv > 1) {
+ // startup: just load latest full map
+ bufferlist latest;
+ version_t v = paxos->get_latest(latest);
+ if (v) {
+ dout(7) << "update_from_paxos startup: loading summary e" << v << dendl;
+ bufferlist::iterator p = latest.begin();
+ ::decode(summary, p);
+ }
+ }
+
// walk through incrementals
- while (paxosv > log_version) {
+ while (paxosv > summary.version) {
bufferlist bl;
- bool success = paxos->read(log_version+1, bl);
+ bool success = paxos->read(summary.version+1, bl);
assert(success);
bufferlist::iterator p = bl.begin();
while (!p.end()) {
LogEntry le;
le.decode(p);
- dout(7) << "update_from_paxos applying incremental log " << log_version+1 << " " << le << dendl;
+ dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl;
stringstream ss;
ss << le;
blogerr.append(s);
if (le.type >= LOG_ERROR)
blogerr.append(s);
+
+ summary.add(le);
}
- log_version++;
+ summary.version++;
}
-
+
+ bufferlist bl;
+ ::encode(summary, bl);
+ paxos->stash_latest(paxosv, bl);
+
if (blog.length())
mon->store->append_bl_ss(blog, "log", NULL);
if (blogdebug.length())
if (blogerr.length())
mon->store->append_bl_ss(blogerr, "log.err", NULL);
- mon->store->put_int(paxosv, "logm", "last_consumed");
-
return true;
}
void LogMonitor::create_pending()
{
pending_inc.clear();
+ pending_summary = summary;
dout(10) << "create_pending v " << (paxos->get_version() + 1) << dendl;
}
bool LogMonitor::preprocess_log(MLog *m)
{
+ dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
+
+ int num_new = 0;
+ for (deque<LogEntry>::iterator p = m->entries.begin();
+ p != m->entries.end();
+ p++) {
+ if (!pending_summary.contains(p->key()))
+ num_new++;
+ }
+ if (!num_new) {
+ dout(10) << " nothing new" << dendl;
+ return true;
+ }
return false;
}
p != m->entries.end();
p++) {
dout(10) << " logging " << *p << dendl;
- (*p).encode(pending_inc);
+ if (!pending_summary.contains(p->key())) {
+ pending_summary.add(*p);
+ (*p).encode(pending_inc);
+ }
}
paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst()));
class LogMonitor : public PaxosService {
private:
bufferlist pending_inc;
- version_t log_version;
+ LogSummary pending_summary, summary;
void create_initial(bufferlist& bl);
bool update_from_paxos();
bool prepare_command(MMonCommand *m);
public:
- LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), log_version(0) { }
+ LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
void tick(); // check state, take actions
};