]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: system-wide log
authorSage Weil <sage@newdream.net>
Tue, 2 Dec 2008 22:28:30 +0000 (14:28 -0800)
committerSage Weil <sage@newdream.net>
Tue, 2 Dec 2008 22:28:30 +0000 (14:28 -0800)
Pretty rudimentary still.

src/Makefile.am
src/include/LogEntry.h [new file with mode: 0644]
src/messages/MLog.h [new file with mode: 0644]
src/mon/LogMonitor.cc [new file with mode: 0644]
src/mon/LogMonitor.h [new file with mode: 0644]
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/mon_types.h
src/msg/Message.cc
src/msg/Message.h

index 31bd0bcd62c372872330e127e95d2f51d9c32fd7..c613154fbd13e6cb68475cabb417f1c9a5840cd6 100644 (file)
@@ -196,6 +196,7 @@ libmon_a_SOURCES = \
        mon/MDSMonitor.cc \
        mon/ClientMonitor.cc \
        mon/PGMonitor.cc \
+       mon/LogMonitor.cc \
        mon/Elector.cc \
        mon/MonitorStore.cc
 
diff --git a/src/include/LogEntry.h b/src/include/LogEntry.h
new file mode 100644 (file)
index 0000000..8c5bb8a
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __CEPH_LOG_H
+#define __CEPH_LOG_H
+
+#include "include/types.h"
+#include "include/encoding.h"
+
+struct LogEntry {
+  entity_inst_t who;
+  utime_t stamp;
+  __u64 seq;
+  __u8 level;
+  string msg;
+
+  void encode(bufferlist& bl) const {
+    ::encode(who, bl);
+    ::encode(stamp, bl);
+    ::encode(seq, bl);
+    ::encode(level, bl);
+    ::encode(msg, bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    ::decode(who, bl);
+    ::decode(stamp, bl);
+    ::decode(seq, bl);
+    ::decode(level, bl);
+    ::decode(msg, bl);
+  }
+};
+WRITE_CLASS_ENCODER(LogEntry)
+
+inline ostream& operator<<(ostream& out, const LogEntry& e)
+{
+  return out << e.stamp << " " << e.who << " : " << e.seq << " : " << (int)e.level << " : " << e.msg;
+}
+
+#endif
diff --git a/src/messages/MLog.h b/src/messages/MLog.h
new file mode 100644 (file)
index 0000000..e56342a
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MLOG_H
+#define __MLOG_H
+
+#include "include/LogEntry.h"
+
+class MLog : public Message {
+public:
+  ceph_fsid fsid;
+  deque<LogEntry> entries;
+  version_t last;
+  
+  MLog() : Message(MSG_PGSTATS) {}
+  MLog(ceph_fsid& f) : 
+    Message(MSG_LOG), fsid(f) {}
+
+  const char *get_type_name() { return "log"; }
+  void print(ostream& out) {
+    out << "log";
+  }
+
+  void encode_payload() {
+    ::encode(fsid, payload);
+    ::encode(entries, payload);
+    ::encode(last, payload);
+  }
+  void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(fsid, p);
+    ::decode(entries, p);
+    ::decode(last, p);
+  }
+};
+
+#endif
diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc
new file mode 100644 (file)
index 0000000..a07cb22
--- /dev/null
@@ -0,0 +1,299 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#include "LogMonitor.h"
+#include "Monitor.h"
+#include "MonitorStore.h"
+
+#include "messages/MMonCommand.h"
+#include "messages/MLog.h"
+
+#include "common/Timer.h"
+
+#include "osd/osd_types.h"
+#include "osd/PG.h"  // yuck
+
+#include "config.h"
+#include <sstream>
+
+#define DOUT_SUBSYS mon
+#undef dout_prefix
+#define dout_prefix _prefix(mon, paxos->get_version())
+static ostream& _prefix(Monitor *mon, version_t v) {
+  return *_dout << dbeginl
+               << "mon" << mon->whoami
+               << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)")))
+               << ".log v" << v << " ";
+}
+
+ostream& operator<<(ostream& out, LogMonitor& pm)
+{
+  std::stringstream ss;
+  /*
+  for (hash_map<int,int>::iterator p = pm.pg_map.num_pg_by_state.begin();
+       p != pm.pg_map.num_pg_by_state.end();
+       ++p) {
+    if (p != pm.pg_map.num_pg_by_state.begin())
+      ss << ", ";
+    ss << p->second << " " << pg_state_string(p->first);
+  }
+  string states = ss.str();
+  return out << "v" << pm.pg_map.version << ": "
+            << pm.pg_map.pg_stat.size() << " pgs: "
+            << states << "; "
+            << kb_t(pm.pg_map.total_pg_kb()) << " data, " 
+            << kb_t(pm.pg_map.total_used_kb()) << " used, "
+            << kb_t(pm.pg_map.total_avail_kb()) << " / "
+            << kb_t(pm.pg_map.total_kb()) << " free";
+  */
+  return out << "log";
+}
+
+/*
+ Tick function to update the map based on performance every N seconds
+*/
+
+void LogMonitor::tick() 
+{
+  if (!paxos->is_active()) return;
+
+  update_from_paxos();
+  dout(10) << *this << dendl;
+
+  if (!mon->is_leader()) return; 
+
+}
+
+void LogMonitor::create_initial()
+{
+  dout(10) << "create_initial -- creating initial map" << dendl;
+  LogEntry e;
+  memset(&e.who, 0, sizeof(e.who));
+  e.stamp = g_clock.now();
+  e.level = 0;
+  e.msg = "mkfs";
+  e.seq = 0;
+  stringstream ss;
+  ss << e;
+  string s;
+  getline(ss, s);
+  pending_inc.append(s);
+  pending_inc.append("\n");
+}
+
+bool LogMonitor::update_from_paxos()
+{
+  return true;
+}
+
+void LogMonitor::create_pending()
+{
+  pending_inc.clear();
+  dout(10) << "create_pending v " << (paxos->get_version() + 1) << dendl;
+}
+
+void LogMonitor::encode_pending(bufferlist &bl)
+{
+  dout(10) << "encode_pending v " << (paxos->get_version() + 1) << dendl;
+  bl = pending_inc;
+}
+
+bool LogMonitor::preprocess_query(Message *m)
+{
+  dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
+  switch (m->get_type()) {
+  case MSG_MON_COMMAND:
+    return preprocess_command((MMonCommand*)m);
+
+  case MSG_LOG:
+    return preprocess_log((MLog*)m);
+
+  default:
+    assert(0);
+    delete m;
+    return true;
+  }
+}
+
+bool LogMonitor::prepare_update(Message *m)
+{
+  dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
+  switch (m->get_type()) {
+  case MSG_MON_COMMAND:
+    return prepare_command((MMonCommand*)m);
+  case MSG_LOG:
+    return prepare_log((MLog*)m);
+  default:
+    assert(0);
+    delete m;
+    return false;
+  }
+}
+
+void LogMonitor::committed()
+{
+
+}
+
+bool LogMonitor::preprocess_log(MLog *m)
+{
+  return false;
+}
+
+bool LogMonitor::prepare_log(MLog *m) 
+{
+  dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
+
+  if (!ceph_fsid_equal(&m->fsid, &mon->monmap->fsid)) {
+    dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl;
+    delete m;
+    return false;
+  }
+
+  for (deque<LogEntry>::iterator p = m->entries.begin();
+       p != m->entries.end();
+       p++) {
+    dout(10) << " logging " << *p << dendl;
+    stringstream ss;
+    ss << *p;
+    string s;
+    getline(ss, s);
+    pending_inc.append(s);
+    pending_inc.append("\n");
+  }
+
+  paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst()));
+  return true;
+}
+
+void LogMonitor::_updated_log(MLog *ack, entity_inst_t who)
+{
+  dout(7) << "_updated_log for " << who << dendl;
+  mon->messenger->send_message(ack, who);
+}
+
+
+
+bool LogMonitor::preprocess_command(MMonCommand *m)
+{
+  int r = -1;
+  bufferlist rdata;
+  stringstream ss;
+
+  /*
+  if (m->cmd.size() > 1) {
+    if (m->cmd[1] == "stat") {
+      ss << *this;
+      r = 0;
+    }
+    else if (m->cmd[1] == "getmap") {
+      pg_map.encode(rdata);
+      ss << "got pgmap version " << pg_map.version;
+      r = 0;
+    }
+    else if (m->cmd[1] == "send_pg_creates") {
+      send_pg_creates();
+      ss << "sent pg creates ";
+      r = 0;
+    }
+    else if (m->cmd[1] == "dump") {
+      ss << "version " << pg_map.version << std::endl;
+      ss << "last_osdmap_epoch " << pg_map.last_osdmap_epoch << std::endl;
+      ss << "last_pg_scan " << pg_map.last_pg_scan << std::endl;
+      ss << "pg_stat\tobjects\tkb\tbytes\tv\treported\tstate\tosds" << std::endl;
+      for (set<pg_t>::iterator p = pg_map.pg_set.begin();
+          p != pg_map.pg_set.end();
+          p++) {
+       pg_stat_t &st = pg_map.pg_stat[*p];
+       ss << *p 
+          << "\t" << st.num_objects
+          << "\t" << st.num_kb
+          << "\t" << st.num_bytes
+          << "\t" << pg_state_string(st.state)
+          << "\t" << st.version
+          << "\t" << st.reported
+          << "\t" << st.acting
+          << std::endl;
+      }
+      ss << "osdstat\tobject\tkbused\tkbavail\tkb\thb in\thb out" << std::endl;
+      for (hash_map<int,osd_stat_t>::iterator p = pg_map.osd_stat.begin();
+          p != pg_map.osd_stat.end();
+          p++)
+       ss << p->first
+          << "\t" << p->second.num_objects
+          << "\t" << p->second.kb_used
+          << "\t" << p->second.kb_avail 
+          << "\t" << p->second.kb
+          << "\t" << p->second.hb_in
+          << "\t" << p->second.hb_out
+          << std::endl;
+      while (!ss.eof()) {
+       string s;
+       getline(ss, s);
+       rdata.append(s.c_str(), s.length());
+       rdata.append("\n", 1);
+      }
+      ss << "ok";
+      r = 0;
+    }
+    else if (m->cmd[1] == "scrub" && m->cmd.size() == 3) {
+      pg_t pgid;
+      r = -EINVAL;
+      if (pgid.parse(m->cmd[2].c_str())) {
+       if (mon->pgmon->pg_map.pg_stat.count(pgid)) {
+         if (mon->pgmon->pg_map.pg_stat[pgid].acting.size()) {
+           int osd = mon->pgmon->pg_map.pg_stat[pgid].acting[0];
+           if (mon->osdmon->osdmap.is_up(osd)) {
+             vector<pg_t> pgs(1);
+             pgs[0] = pgid;
+             mon->messenger->send_message(new MOSDScrub(mon->monmap->fsid, pgs),
+                                          mon->osdmon->osdmap.get_inst(osd));
+             ss << "instructing pg " << pgid << " on osd" << osd << " to scrub";
+             r = 0;
+           } else
+             ss << "pg " << pgid << " primary osd" << osd << " not up";
+         } else
+           ss << "pg " << pgid << " has no primary osd";
+       } else
+         ss << "pg " << pgid << " dne";
+      } else
+       ss << "invalid pgid '" << m->cmd[2] << "'";
+    }
+  }
+  */
+
+  if (r != -1) {
+    string rs;
+    getline(ss, rs);
+    mon->reply_command(m, r, rs, rdata);
+    return true;
+  } else
+    return false;
+}
+
+
+bool LogMonitor::prepare_command(MMonCommand *m)
+{
+  stringstream ss;
+  string rs;
+  int err = -EINVAL;
+
+  // nothing here yet
+  ss << "unrecognized command";
+
+  getline(ss, rs);
+  mon->reply_command(m, err, rs);
+  return false;
+}
diff --git a/src/mon/LogMonitor.h b/src/mon/LogMonitor.h
new file mode 100644 (file)
index 0000000..7d9eb7c
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __LOGMONITOR_H
+#define __LOGMONITOR_H
+
+#include <map>
+#include <set>
+using namespace std;
+
+#include "include/types.h"
+#include "msg/Messenger.h"
+#include "PaxosService.h"
+
+#include "include/LogEntry.h"
+
+class MMonCommand;
+class MLog;
+
+class LogMonitor : public PaxosService {
+private:
+  bufferlist pending_inc;
+
+  void create_initial();
+  bool update_from_paxos();
+  void create_pending();  // prepare a new pending
+  void encode_pending(bufferlist &bl);  // propose pending update to peers
+
+  void committed();
+
+  bool preprocess_query(Message *m);  // true if processed.
+  bool prepare_update(Message *m);
+
+  bool preprocess_log(MLog *m);
+  bool prepare_log(MLog *m);
+  void _updated_log(MLog *m, entity_inst_t who);
+
+  struct C_Log : public Context {
+    LogMonitor *logmon;
+    MLog *ack;
+    entity_inst_t who;
+    C_Log(LogMonitor *p, MLog *a, entity_inst_t w) : logmon(p), ack(a), who(w) {}
+    void finish(int r) {
+      logmon->_updated_log(ack, who);
+    }    
+  };
+
+  bool preprocess_command(MMonCommand *m);
+  bool prepare_command(MMonCommand *m);
+
+ public:
+  LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
+  
+  void tick();  // check state, take actions
+};
+
+#endif
index 3147091d04a4df24f1ad161475c483d6e8d478cd..79d397266af19af5af73e2633ed8f9234ffe0615 100644 (file)
@@ -38,6 +38,7 @@
 #include "MDSMonitor.h"
 #include "ClientMonitor.h"
 #include "PGMonitor.h"
+#include "LogMonitor.h"
 
 #include "config.h"
 
@@ -75,6 +76,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map) :
   paxos_osdmap(this, w, PAXOS_OSDMAP),
   paxos_clientmap(this, w, PAXOS_CLIENTMAP),
   paxos_pgmap(this, w, PAXOS_PGMAP),
+  paxos_log(this, w, PAXOS_LOG),
   
   osdmon(0), mdsmon(0), clientmon(0)
 {
@@ -82,6 +84,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map) :
   mdsmon = new MDSMonitor(this, &paxos_mdsmap);
   clientmon = new ClientMonitor(this, &paxos_clientmap);
   pgmon = new PGMonitor(this, &paxos_pgmap);
+  logmon = new LogMonitor(this, &paxos_log);
 }
 
 Monitor::~Monitor()
@@ -90,6 +93,7 @@ Monitor::~Monitor()
   delete mdsmon;
   delete clientmon;
   delete pgmon;
+  delete logmon;
   if (messenger)
     messenger->destroy();
 }
@@ -137,6 +141,7 @@ void Monitor::shutdown()
   mdsmon->shutdown();
   clientmon->shutdown();
   pgmon->shutdown();
+  logmon->shutdown();
 
   // cancel all events
   cancel_tick();
@@ -160,11 +165,13 @@ void Monitor::call_election()
   paxos_osdmap.election_starting();
   paxos_clientmap.election_starting();
   paxos_pgmap.election_starting();
+  paxos_log.election_starting();
 
   mdsmon->election_starting();
   osdmon->election_starting();
   clientmon->election_starting();
   pgmon->election_starting();
+  logmon->election_starting();
   
   // call a new election
   elector.call_election();
@@ -183,12 +190,14 @@ void Monitor::win_election(epoch_t epoch, set<int>& active)
   paxos_osdmap.leader_init();
   paxos_clientmap.leader_init();
   paxos_pgmap.leader_init();
+  paxos_log.leader_init();
   
   // init
   pgmon->election_finished();  // hack: before osdmon, for osd->pg kick works ok
   osdmon->election_finished();
   mdsmon->election_finished();
   clientmon->election_finished();
+  logmon->election_finished();
 } 
 
 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l) 
@@ -205,12 +214,14 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l)
   paxos_osdmap.peon_init();
   paxos_clientmap.peon_init();
   paxos_pgmap.peon_init();
+  paxos_log.peon_init();
   
   // init
   osdmon->election_finished();
   mdsmon->election_finished();
   clientmon->election_finished();
   pgmon->election_finished();
+  logmon->election_finished();
 }
 
 void Monitor::handle_command(MMonCommand *m)
@@ -373,6 +384,10 @@ void Monitor::dispatch(Message *m)
       pgmon->dispatch(m);
       break;
 
+      // log
+    case MSG_LOG:
+      logmon->dispatch(m);
+      break;
 
       // paxos
     case MSG_MON_PAXOS:
@@ -401,6 +416,9 @@ void Monitor::dispatch(Message *m)
        case PAXOS_PGMAP:
          paxos_pgmap.dispatch(m);
          break;
+       case PAXOS_LOG:
+         paxos_log.dispatch(m);
+         break;
        default:
          assert(0);
        }
@@ -502,6 +520,7 @@ void Monitor::tick()
   mdsmon->tick();
   clientmon->tick();
   pgmon->tick();
+  logmon->tick();
   
   // next tick!
   reset_tick();
@@ -541,6 +560,7 @@ int Monitor::mkfs()
   services.push_back(mdsmon);
   services.push_back(clientmon);
   services.push_back(pgmon);
+  services.push_back(logmon);
   for (list<PaxosService*>::iterator p = services.begin(); 
        p != services.end();
        p++) {
index 9acceccaf4f577ade97e84d1309a147e8449aaed..78048dda27c0e6cfff8c0e15117182235422e2e3 100644 (file)
@@ -38,6 +38,7 @@ class OSDMonitor;
 class MDSMonitor;
 class ClientMonitor;
 class PGMonitor;
+class LogMonitor;
 
 class MMonGetMap;
 
@@ -104,6 +105,7 @@ public:
   Paxos paxos_osdmap;
   Paxos paxos_clientmap;
   Paxos paxos_pgmap;
+  Paxos paxos_log;
   friend class Paxos;
   
 
@@ -112,11 +114,13 @@ public:
   MDSMonitor *mdsmon;
   ClientMonitor *clientmon;
   PGMonitor *pgmon;
+  LogMonitor *logmon;
 
   friend class OSDMonitor;
   friend class MDSMonitor;
   friend class ClientMonitor;
   friend class PGMonitor;
+  friend class LogMonitor;
 
 
   // messages
index 8d1ac92822356e8f51fa661fcf0c38c6e9c2278f..bcba5729ef130555846955a0674dc0e413ade2cd 100644 (file)
@@ -20,6 +20,7 @@
 #define PAXOS_OSDMAP     2
 #define PAXOS_CLIENTMAP  3
 #define PAXOS_PGMAP      4
+#define PAXOS_LOG        5
 
 inline const char *get_paxos_name(int p) {
   switch (p) {
@@ -28,6 +29,7 @@ inline const char *get_paxos_name(int p) {
   case PAXOS_OSDMAP: return "osdmap";
   case PAXOS_CLIENTMAP: return "clientmap";
   case PAXOS_PGMAP: return "pgmap";
+  case PAXOS_LOG: return "log";
   default: assert(0); return 0;
   }
 }
index f286ca89246d9a0746cb543ecfad154008b7ff2f..3e322a1b40bda0c20db694dbe1470a5bf5867488 100644 (file)
@@ -21,6 +21,7 @@ using namespace std;
 #include "messages/MMonPaxos.h"
 
 #include "messages/MMonElection.h"
+#include "messages/MLog.h"
 
 #include "messages/MPing.h"
 
@@ -164,6 +165,10 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
     m = new MMonElection;
     break;
 
+  case MSG_LOG:
+    m = new MLog;
+    break;
+
   case CEPH_MSG_PING:
     m = new MPing();
     break;
index fe4e7dd8648e88b7bcc7103eccb7bae5ede89dc9..bf6eca6d497b56e90be25042f0a5725e45f0e50f 100644 (file)
@@ -26,6 +26,8 @@
 #define MSG_MON_COMMAND            50
 #define MSG_MON_COMMAND_ACK        51
 
+#define MSG_LOG              52
+
 // osd internal
 #define MSG_OSD_PING         70
 #define MSG_OSD_BOOT         71