]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: avoid duplicate log entries
authorSage Weil <sage@newdream.net>
Wed, 22 Apr 2009 22:35:36 +0000 (15:35 -0700)
committerSage Weil <sage@newdream.net>
Wed, 22 Apr 2009 22:35:36 +0000 (15:35 -0700)
Also avoid dumping the whole log on 'ceph -w' startup.

src/ceph.cc
src/include/LogEntry.h
src/mon/LogMonitor.cc
src/mon/LogMonitor.h

index 439594711130ad80b321cece085e022f42a4c3dd..eff440dfe0af007f3fd6a1b19eba157c6e353727 100644 (file)
@@ -160,11 +160,19 @@ void handle_notify(MMonObserveNotify *notify)
 
   case PAXOS_LOG:
     {
-      LogEntry le;
       bufferlist::iterator p = notify->bl.begin();
-      while (!p.end()) {
-       le.decode(p);
-       dout(0) << "   log " << le << dendl;
+      if (notify->is_latest) {
+       LogSummary summary;
+       ::decode(summary, p);
+       // show last log message
+       if (!summary.tail.empty())
+         dout(0) << "   log " << summary.tail.back() << dendl;
+      } else {
+       LogEntry le;
+       while (!p.end()) {
+         le.decode(p);
+         dout(0) << "   log " << le << dendl;
+       }
       }
       break;
     }
index 0717db427b649afc1b4adc8103272b2b4d8a4b34..3d3ca2cb76131b3b35374a75b99322fca8ea024d 100644 (file)
@@ -26,6 +26,31 @@ typedef enum {
   LOG_ERROR = 4,
 } log_type;
 
+struct LogEntryKey {
+  entity_inst_t who;
+  utime_t stamp;
+  __u64 seq;
+
+  LogEntryKey() {}
+  LogEntryKey(entity_inst_t w, utime_t t, __u64 s) : who(w), stamp(t), seq(s) {}
+
+  void encode(bufferlist& bl) const {
+    ::encode(who, bl);
+    ::encode(stamp, bl);
+    ::encode(seq, bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    ::decode(who, bl);
+    ::decode(stamp, bl);
+    ::decode(seq, bl);
+  }
+};
+WRITE_CLASS_ENCODER(LogEntryKey)
+
+static inline bool operator==(const LogEntryKey& l, const LogEntryKey& r) {
+  return l.who == r.who && l.stamp == r.stamp && l.seq == r.seq;
+}
+
 struct LogEntry {
   entity_inst_t who;
   utime_t stamp;
@@ -33,6 +58,8 @@ struct LogEntry {
   log_type type;
   string msg;
 
+  LogEntryKey key() const { return LogEntryKey(who, stamp, seq); }
+
   void encode(bufferlist& bl) const {
     __u16 t = type;
     ::encode(who, bl);
@@ -53,6 +80,36 @@ struct LogEntry {
 };
 WRITE_CLASS_ENCODER(LogEntry)
 
+struct LogSummary {
+  version_t version;
+  list<LogEntry> tail;
+
+  LogSummary() : version(0) {}
+
+  void add(const LogEntry& e) {
+    tail.push_back(e);
+    while (tail.size() > 50)
+      tail.pop_front();
+  }
+  bool contains(LogEntryKey k) const {
+    for (list<LogEntry>::const_iterator p = tail.begin();
+        p != tail.end();
+        p++)
+      if (p->key() == k) return true;
+    return false;
+  }
+
+  void encode(bufferlist& bl) const {
+    ::encode(version, bl);
+    ::encode(tail, bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    ::decode(version, bl);
+    ::decode(tail, bl);
+  }
+};
+WRITE_CLASS_ENCODER(LogSummary)
+
 inline ostream& operator<<(ostream& out, const log_type& t)
 {
   switch (t) {
index 9e235929df74518d2101a87be4d4d603785526cc..5c9ca4bde278bfac6681b63565c29bcc28a7426d 100644 (file)
@@ -93,12 +93,8 @@ bool LogMonitor::update_from_paxos()
 {
   version_t paxosv = paxos->get_version();
 
-  if (paxosv == log_version) return true;
-  assert(paxosv >= log_version);
-
-  if (log_version == 0 && paxosv > 1) {
-    log_version = mon->store->get_int("logm", "last_consumed");
-  }
+  if (paxosv == summary.version) return true;
+  assert(paxosv >= summary.version);
 
   bufferlist blog;
   bufferlist blogdebug;
@@ -107,17 +103,28 @@ bool LogMonitor::update_from_paxos()
   bufferlist blogerr;
   bufferlist blogsec;
 
+  if (summary.version == 0 && paxosv > 1) {
+    // startup: just load latest full map
+    bufferlist latest;
+    version_t v = paxos->get_latest(latest);
+    if (v) {
+      dout(7) << "update_from_paxos startup: loading summary e" << v << dendl;
+      bufferlist::iterator p = latest.begin();
+      ::decode(summary, p);
+    }
+  } 
+
   // walk through incrementals
-  while (paxosv > log_version) {
+  while (paxosv > summary.version) {
     bufferlist bl;
-    bool success = paxos->read(log_version+1, bl);
+    bool success = paxos->read(summary.version+1, bl);
     assert(success);
 
     bufferlist::iterator p = bl.begin();
     while (!p.end()) {
       LogEntry le;
       le.decode(p);
-      dout(7) << "update_from_paxos applying incremental log " << log_version+1 <<  " " << le << dendl;
+      dout(7) << "update_from_paxos applying incremental log " << summary.version+1 <<  " " << le << dendl;
 
       stringstream ss;
       ss << le;
@@ -138,11 +145,17 @@ bool LogMonitor::update_from_paxos()
        blogerr.append(s);
       if (le.type >= LOG_ERROR)
        blogerr.append(s);
+
+      summary.add(le);
     }
 
-    log_version++;
+    summary.version++;
   }
-  
+
+  bufferlist bl;
+  ::encode(summary, bl);
+  paxos->stash_latest(paxosv, bl);
   if (blog.length())
     mon->store->append_bl_ss(blog, "log", NULL);
   if (blogdebug.length())
@@ -156,14 +169,13 @@ bool LogMonitor::update_from_paxos()
   if (blogerr.length())
     mon->store->append_bl_ss(blogerr, "log.err", NULL);
 
-  mon->store->put_int(paxosv, "logm", "last_consumed");
-
   return true;
 }
 
 void LogMonitor::create_pending()
 {
   pending_inc.clear();
+  pending_summary = summary;
   dout(10) << "create_pending v " << (paxos->get_version() + 1) << dendl;
 }
 
@@ -212,6 +224,19 @@ void LogMonitor::committed()
 
 bool LogMonitor::preprocess_log(MLog *m)
 {
+  dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
+  
+  int num_new = 0;
+  for (deque<LogEntry>::iterator p = m->entries.begin();
+       p != m->entries.end();
+       p++) {
+    if (!pending_summary.contains(p->key()))
+      num_new++;
+  }
+  if (!num_new) {
+    dout(10) << "  nothing new" << dendl;
+    return true;
+  }
   return false;
 }
 
@@ -229,7 +254,10 @@ bool LogMonitor::prepare_log(MLog *m)
        p != m->entries.end();
        p++) {
     dout(10) << " logging " << *p << dendl;
-    (*p).encode(pending_inc);
+    if (!pending_summary.contains(p->key())) {
+      pending_summary.add(*p);
+      (*p).encode(pending_inc);
+    }
   }
 
   paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst()));
index d5c09664cdc42d9ba768735d923050644cc0d94e..7a465e5b046ed9a1eb82567bc7cd9b7a10e66c3a 100644 (file)
@@ -31,7 +31,7 @@ class MLog;
 class LogMonitor : public PaxosService {
 private:
   bufferlist pending_inc;
-  version_t log_version;
+  LogSummary pending_summary, summary;
 
   void create_initial(bufferlist& bl);
   bool update_from_paxos();
@@ -61,7 +61,7 @@ private:
   bool prepare_command(MMonCommand *m);
 
  public:
-  LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), log_version(0) { }
+  LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
   
   void tick();  // check state, take actions
 };