]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
logmonitor: append all notifications in a single file
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 5 Dec 2008 00:14:55 +0000 (16:14 -0800)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 5 Dec 2008 00:15:55 +0000 (16:15 -0800)
src/common/LogClient.cc
src/common/LogClient.h
src/mon/LogMonitor.cc
src/mon/LogMonitor.h
src/mon/MonitorStore.cc
src/mon/MonitorStore.h

index e88eab4e06e561b186628196e2bc25a87a0e2dd9..d618ed3b69bcaa746672d8b09d8bade65da04995 100644 (file)
 
 #include "config.h"
 
+void LogClient::log(log_type type, const char *s)
+{
+  string str(s);
+  log(type, str);
+}
+
 void LogClient::log(log_type type, stringstream& ss)
 {
   while (!ss.eof()) {
index fec14908e2f9b1a84a664112755a3abfd12c62ea..a3ce141bdd33e0411c5c3753a82c6ff421783a71 100644 (file)
@@ -39,6 +39,7 @@ class LogClient : public Dispatcher {
   deque<LogEntry> log_queue;
   version_t last_log;
 
+  void log(log_type type, const char *s);
   void log(log_type type, string& s);
   void log(log_type type, stringstream& s);
   void send_log();
index 87bd37ff93cefeaea7b25f56589c0706e87f497a..d6c51769382d30238fedb1b6739d6c98fdd21cd2 100644 (file)
@@ -85,16 +85,44 @@ void LogMonitor::create_initial()
   e.type = LOG_INFO;
   e.msg = "mkfs";
   e.seq = 0;
-  stringstream ss;
-  ss << e;
-  string s;
-  getline(ss, s);
-  pending_inc.append(s);
-  pending_inc.append("\n");
+  e.encode(pending_inc);
 }
 
 bool LogMonitor::update_from_paxos()
 {
+  version_t paxosv = paxos->get_version();
+
+  if (paxosv == log_version) return true;
+  assert(paxosv >= log_version);
+
+  if (log_version == 0 && paxosv > 1) {
+    log_version = mon->store->get_int("log", "last_consumed");
+  }
+
+  // walk through incrementals
+  while (paxosv > log_version) {
+    bufferlist bl, new_bl;
+    LogEntry le;
+    bool success = paxos->read(log_version+1, bl);
+    assert(success);
+    bufferlist::iterator p = bl.begin();
+
+    le.decode(p);
+    dout(7) << "update_from_paxos applying incremental log " << log_version+1 <<  " " << le << dendl;
+
+    stringstream ss;
+    ss << le;
+    string s;
+    getline(ss, s);
+    new_bl.append(s);
+    new_bl.append("\n");
+    mon->store->append_bl_ss(new_bl, "out", NULL);
+
+    log_version++;
+  }
+
+  mon->store->put_int(paxosv, "log", "last_consumed");
+
   return true;
 }
 
@@ -166,12 +194,7 @@ bool LogMonitor::prepare_log(MLog *m)
        p != m->entries.end();
        p++) {
     dout(10) << " logging " << *p << dendl;
-    stringstream ss;
-    ss << *p;
-    string s;
-    getline(ss, s);
-    pending_inc.append(s);
-    pending_inc.append("\n");
+    (*p).encode(pending_inc);
   }
 
   paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst()));
index 7d9eb7c91d75b704eda09fb12f917bf6d41055d6..05490eab899de2a3b87721a26f4fef6214785aba 100644 (file)
@@ -31,6 +31,7 @@ class MLog;
 class LogMonitor : public PaxosService {
 private:
   bufferlist pending_inc;
+  version_t log_version;
 
   void create_initial();
   bool update_from_paxos();
@@ -60,7 +61,7 @@ private:
   bool prepare_command(MMonCommand *m);
 
  public:
-  LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
+  LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), log_version(0) { }
   
   void tick();  // check state, take actions
 };
index 063c20a277e952abc7953152656309176185b26d..0d7837ffecaaa083c8979e8e15c74dce51a87fb4 100644 (file)
@@ -238,7 +238,7 @@ int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b)
   return len;
 }
 
-int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync)
+int MonitorStore::write_bl_ss(bufferlist& bl, const char *a, const char *b, bool append, bool sync)
 {
   char fn[200];
   sprintf(fn, "%s/%s", dir.c_str(), a);
@@ -251,8 +251,13 @@ int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b, bool s
   }
   
   char tfn[200];
-  sprintf(tfn, "%s.new", fn);
-  int fd = ::open(tfn, O_WRONLY|O_CREAT, 0644);
+  int fd;
+  if (append) {
+    fd = ::open(fn, O_WRONLY|O_CREAT|O_APPEND, 0644);
+  } else {
+    sprintf(tfn, "%s.new", fn);
+    fd = ::open(tfn, O_WRONLY|O_CREAT, 0644);
+  }
   assert(fd >= 0);
   
   // write data
@@ -269,7 +274,10 @@ int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b, bool s
   if (sync)
     ::fsync(fd);
   ::close(fd);
-  ::rename(tfn, fn);
+  if (!append) {
+    ::rename(tfn, fn);
+  }
 
   return 0;
 }
+
index 01bae2bfe7e91e6427a47bc2d73036dc53f659e1..1bf0a1c910237de006449872f9dc5854d6fbbb57 100644 (file)
@@ -24,6 +24,7 @@ class MonitorStore {
   string dir;
   int lock_fd;
 
+  int write_bl_ss(bufferlist& bl, const char *a, const char *b, bool append, bool sync=true);
 public:
   MonitorStore(const char *d) : dir(d) { }
   ~MonitorStore() { }
@@ -42,7 +43,12 @@ public:
   // ss and sn varieties.
   bool exists_bl_ss(const char *a, const char *b=0);
   int get_bl_ss(bufferlist& bl, const char *a, const char *b);
-  int put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true);
+  int put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true) {
+    return write_bl_ss(bl, a, b, false, sync);
+  }
+  int append_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true) {
+    return write_bl_ss(bl, a, b, true, sync);
+  }
   bool exists_bl_sn(const char *a, version_t b) {
     char bs[20];
 #ifdef __LP64__