Pretty rudimentary still.
mon/MDSMonitor.cc \
mon/ClientMonitor.cc \
mon/PGMonitor.cc \
+ mon/LogMonitor.cc \
mon/Elector.cc \
mon/MonitorStore.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __CEPH_LOG_H
+#define __CEPH_LOG_H
+
+#include "include/types.h"
+#include "include/encoding.h"
+
+struct LogEntry {
+ entity_inst_t who;
+ utime_t stamp;
+ __u64 seq;
+ __u8 level;
+ string msg;
+
+ void encode(bufferlist& bl) const {
+ ::encode(who, bl);
+ ::encode(stamp, bl);
+ ::encode(seq, bl);
+ ::encode(level, bl);
+ ::encode(msg, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(who, bl);
+ ::decode(stamp, bl);
+ ::decode(seq, bl);
+ ::decode(level, bl);
+ ::decode(msg, bl);
+ }
+};
+WRITE_CLASS_ENCODER(LogEntry)
+
+inline ostream& operator<<(ostream& out, const LogEntry& e)
+{
+ return out << e.stamp << " " << e.who << " : " << e.seq << " : " << (int)e.level << " : " << e.msg;
+}
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MLOG_H
+#define __MLOG_H
+
+#include "include/LogEntry.h"
+
+class MLog : public Message {
+public:
+ ceph_fsid fsid;
+ deque<LogEntry> entries;
+ version_t last;
+
+ MLog() : Message(MSG_PGSTATS) {}
+ MLog(ceph_fsid& f) :
+ Message(MSG_LOG), fsid(f) {}
+
+ const char *get_type_name() { return "log"; }
+ void print(ostream& out) {
+ out << "log";
+ }
+
+ void encode_payload() {
+ ::encode(fsid, payload);
+ ::encode(entries, payload);
+ ::encode(last, payload);
+ }
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(fsid, p);
+ ::decode(entries, p);
+ ::decode(last, p);
+ }
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include "LogMonitor.h"
+#include "Monitor.h"
+#include "MonitorStore.h"
+
+#include "messages/MMonCommand.h"
+#include "messages/MLog.h"
+
+#include "common/Timer.h"
+
+#include "osd/osd_types.h"
+#include "osd/PG.h" // yuck
+
+#include "config.h"
+#include <sstream>
+
+#define DOUT_SUBSYS mon
+#undef dout_prefix
+#define dout_prefix _prefix(mon, paxos->get_version())
+static ostream& _prefix(Monitor *mon, version_t v) {
+ return *_dout << dbeginl
+ << "mon" << mon->whoami
+ << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)")))
+ << ".log v" << v << " ";
+}
+
+ostream& operator<<(ostream& out, LogMonitor& pm)
+{
+ std::stringstream ss;
+ /*
+ for (hash_map<int,int>::iterator p = pm.pg_map.num_pg_by_state.begin();
+ p != pm.pg_map.num_pg_by_state.end();
+ ++p) {
+ if (p != pm.pg_map.num_pg_by_state.begin())
+ ss << ", ";
+ ss << p->second << " " << pg_state_string(p->first);
+ }
+ string states = ss.str();
+ return out << "v" << pm.pg_map.version << ": "
+ << pm.pg_map.pg_stat.size() << " pgs: "
+ << states << "; "
+ << kb_t(pm.pg_map.total_pg_kb()) << " data, "
+ << kb_t(pm.pg_map.total_used_kb()) << " used, "
+ << kb_t(pm.pg_map.total_avail_kb()) << " / "
+ << kb_t(pm.pg_map.total_kb()) << " free";
+ */
+ return out << "log";
+}
+
+/*
+ Tick function to update the map based on performance every N seconds
+*/
+
+void LogMonitor::tick()
+{
+ if (!paxos->is_active()) return;
+
+ update_from_paxos();
+ dout(10) << *this << dendl;
+
+ if (!mon->is_leader()) return;
+
+}
+
+void LogMonitor::create_initial()
+{
+ dout(10) << "create_initial -- creating initial map" << dendl;
+ LogEntry e;
+ memset(&e.who, 0, sizeof(e.who));
+ e.stamp = g_clock.now();
+ e.level = 0;
+ e.msg = "mkfs";
+ e.seq = 0;
+ stringstream ss;
+ ss << e;
+ string s;
+ getline(ss, s);
+ pending_inc.append(s);
+ pending_inc.append("\n");
+}
+
+bool LogMonitor::update_from_paxos()
+{
+ return true;
+}
+
+void LogMonitor::create_pending()
+{
+ pending_inc.clear();
+ dout(10) << "create_pending v " << (paxos->get_version() + 1) << dendl;
+}
+
+void LogMonitor::encode_pending(bufferlist &bl)
+{
+ dout(10) << "encode_pending v " << (paxos->get_version() + 1) << dendl;
+ bl = pending_inc;
+}
+
+bool LogMonitor::preprocess_query(Message *m)
+{
+ dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
+ switch (m->get_type()) {
+ case MSG_MON_COMMAND:
+ return preprocess_command((MMonCommand*)m);
+
+ case MSG_LOG:
+ return preprocess_log((MLog*)m);
+
+ default:
+ assert(0);
+ delete m;
+ return true;
+ }
+}
+
+bool LogMonitor::prepare_update(Message *m)
+{
+ dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
+ switch (m->get_type()) {
+ case MSG_MON_COMMAND:
+ return prepare_command((MMonCommand*)m);
+ case MSG_LOG:
+ return prepare_log((MLog*)m);
+ default:
+ assert(0);
+ delete m;
+ return false;
+ }
+}
+
+void LogMonitor::committed()
+{
+
+}
+
+bool LogMonitor::preprocess_log(MLog *m)
+{
+ return false;
+}
+
+bool LogMonitor::prepare_log(MLog *m)
+{
+ dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
+
+ if (!ceph_fsid_equal(&m->fsid, &mon->monmap->fsid)) {
+ dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl;
+ delete m;
+ return false;
+ }
+
+ for (deque<LogEntry>::iterator p = m->entries.begin();
+ p != m->entries.end();
+ p++) {
+ dout(10) << " logging " << *p << dendl;
+ stringstream ss;
+ ss << *p;
+ string s;
+ getline(ss, s);
+ pending_inc.append(s);
+ pending_inc.append("\n");
+ }
+
+ paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst()));
+ return true;
+}
+
+void LogMonitor::_updated_log(MLog *ack, entity_inst_t who)
+{
+ dout(7) << "_updated_log for " << who << dendl;
+ mon->messenger->send_message(ack, who);
+}
+
+
+
+bool LogMonitor::preprocess_command(MMonCommand *m)
+{
+ int r = -1;
+ bufferlist rdata;
+ stringstream ss;
+
+ /*
+ if (m->cmd.size() > 1) {
+ if (m->cmd[1] == "stat") {
+ ss << *this;
+ r = 0;
+ }
+ else if (m->cmd[1] == "getmap") {
+ pg_map.encode(rdata);
+ ss << "got pgmap version " << pg_map.version;
+ r = 0;
+ }
+ else if (m->cmd[1] == "send_pg_creates") {
+ send_pg_creates();
+ ss << "sent pg creates ";
+ r = 0;
+ }
+ else if (m->cmd[1] == "dump") {
+ ss << "version " << pg_map.version << std::endl;
+ ss << "last_osdmap_epoch " << pg_map.last_osdmap_epoch << std::endl;
+ ss << "last_pg_scan " << pg_map.last_pg_scan << std::endl;
+ ss << "pg_stat\tobjects\tkb\tbytes\tv\treported\tstate\tosds" << std::endl;
+ for (set<pg_t>::iterator p = pg_map.pg_set.begin();
+ p != pg_map.pg_set.end();
+ p++) {
+ pg_stat_t &st = pg_map.pg_stat[*p];
+ ss << *p
+ << "\t" << st.num_objects
+ << "\t" << st.num_kb
+ << "\t" << st.num_bytes
+ << "\t" << pg_state_string(st.state)
+ << "\t" << st.version
+ << "\t" << st.reported
+ << "\t" << st.acting
+ << std::endl;
+ }
+ ss << "osdstat\tobject\tkbused\tkbavail\tkb\thb in\thb out" << std::endl;
+ for (hash_map<int,osd_stat_t>::iterator p = pg_map.osd_stat.begin();
+ p != pg_map.osd_stat.end();
+ p++)
+ ss << p->first
+ << "\t" << p->second.num_objects
+ << "\t" << p->second.kb_used
+ << "\t" << p->second.kb_avail
+ << "\t" << p->second.kb
+ << "\t" << p->second.hb_in
+ << "\t" << p->second.hb_out
+ << std::endl;
+ while (!ss.eof()) {
+ string s;
+ getline(ss, s);
+ rdata.append(s.c_str(), s.length());
+ rdata.append("\n", 1);
+ }
+ ss << "ok";
+ r = 0;
+ }
+ else if (m->cmd[1] == "scrub" && m->cmd.size() == 3) {
+ pg_t pgid;
+ r = -EINVAL;
+ if (pgid.parse(m->cmd[2].c_str())) {
+ if (mon->pgmon->pg_map.pg_stat.count(pgid)) {
+ if (mon->pgmon->pg_map.pg_stat[pgid].acting.size()) {
+ int osd = mon->pgmon->pg_map.pg_stat[pgid].acting[0];
+ if (mon->osdmon->osdmap.is_up(osd)) {
+ vector<pg_t> pgs(1);
+ pgs[0] = pgid;
+ mon->messenger->send_message(new MOSDScrub(mon->monmap->fsid, pgs),
+ mon->osdmon->osdmap.get_inst(osd));
+ ss << "instructing pg " << pgid << " on osd" << osd << " to scrub";
+ r = 0;
+ } else
+ ss << "pg " << pgid << " primary osd" << osd << " not up";
+ } else
+ ss << "pg " << pgid << " has no primary osd";
+ } else
+ ss << "pg " << pgid << " dne";
+ } else
+ ss << "invalid pgid '" << m->cmd[2] << "'";
+ }
+ }
+ */
+
+ if (r != -1) {
+ string rs;
+ getline(ss, rs);
+ mon->reply_command(m, r, rs, rdata);
+ return true;
+ } else
+ return false;
+}
+
+
+bool LogMonitor::prepare_command(MMonCommand *m)
+{
+ stringstream ss;
+ string rs;
+ int err = -EINVAL;
+
+ // nothing here yet
+ ss << "unrecognized command";
+
+ getline(ss, rs);
+ mon->reply_command(m, err, rs);
+ return false;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __LOGMONITOR_H
+#define __LOGMONITOR_H
+
+#include <map>
+#include <set>
+using namespace std;
+
+#include "include/types.h"
+#include "msg/Messenger.h"
+#include "PaxosService.h"
+
+#include "include/LogEntry.h"
+
+class MMonCommand;
+class MLog;
+
+class LogMonitor : public PaxosService {
+private:
+ bufferlist pending_inc;
+
+ void create_initial();
+ bool update_from_paxos();
+ void create_pending(); // prepare a new pending
+ void encode_pending(bufferlist &bl); // propose pending update to peers
+
+ void committed();
+
+ bool preprocess_query(Message *m); // true if processed.
+ bool prepare_update(Message *m);
+
+ bool preprocess_log(MLog *m);
+ bool prepare_log(MLog *m);
+ void _updated_log(MLog *m, entity_inst_t who);
+
+ struct C_Log : public Context {
+ LogMonitor *logmon;
+ MLog *ack;
+ entity_inst_t who;
+ C_Log(LogMonitor *p, MLog *a, entity_inst_t w) : logmon(p), ack(a), who(w) {}
+ void finish(int r) {
+ logmon->_updated_log(ack, who);
+ }
+ };
+
+ bool preprocess_command(MMonCommand *m);
+ bool prepare_command(MMonCommand *m);
+
+ public:
+ LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
+
+ void tick(); // check state, take actions
+};
+
+#endif
#include "MDSMonitor.h"
#include "ClientMonitor.h"
#include "PGMonitor.h"
+#include "LogMonitor.h"
#include "config.h"
paxos_osdmap(this, w, PAXOS_OSDMAP),
paxos_clientmap(this, w, PAXOS_CLIENTMAP),
paxos_pgmap(this, w, PAXOS_PGMAP),
+ paxos_log(this, w, PAXOS_LOG),
osdmon(0), mdsmon(0), clientmon(0)
{
mdsmon = new MDSMonitor(this, &paxos_mdsmap);
clientmon = new ClientMonitor(this, &paxos_clientmap);
pgmon = new PGMonitor(this, &paxos_pgmap);
+ logmon = new LogMonitor(this, &paxos_log);
}
Monitor::~Monitor()
delete mdsmon;
delete clientmon;
delete pgmon;
+ delete logmon;
if (messenger)
messenger->destroy();
}
mdsmon->shutdown();
clientmon->shutdown();
pgmon->shutdown();
+ logmon->shutdown();
// cancel all events
cancel_tick();
paxos_osdmap.election_starting();
paxos_clientmap.election_starting();
paxos_pgmap.election_starting();
+ paxos_log.election_starting();
mdsmon->election_starting();
osdmon->election_starting();
clientmon->election_starting();
pgmon->election_starting();
+ logmon->election_starting();
// call a new election
elector.call_election();
paxos_osdmap.leader_init();
paxos_clientmap.leader_init();
paxos_pgmap.leader_init();
+ paxos_log.leader_init();
// init
pgmon->election_finished(); // hack: before osdmon, for osd->pg kick works ok
osdmon->election_finished();
mdsmon->election_finished();
clientmon->election_finished();
+ logmon->election_finished();
}
void Monitor::lose_election(epoch_t epoch, set<int> &q, int l)
paxos_osdmap.peon_init();
paxos_clientmap.peon_init();
paxos_pgmap.peon_init();
+ paxos_log.peon_init();
// init
osdmon->election_finished();
mdsmon->election_finished();
clientmon->election_finished();
pgmon->election_finished();
+ logmon->election_finished();
}
void Monitor::handle_command(MMonCommand *m)
pgmon->dispatch(m);
break;
+ // log
+ case MSG_LOG:
+ logmon->dispatch(m);
+ break;
// paxos
case MSG_MON_PAXOS:
case PAXOS_PGMAP:
paxos_pgmap.dispatch(m);
break;
+ case PAXOS_LOG:
+ paxos_log.dispatch(m);
+ break;
default:
assert(0);
}
mdsmon->tick();
clientmon->tick();
pgmon->tick();
+ logmon->tick();
// next tick!
reset_tick();
services.push_back(mdsmon);
services.push_back(clientmon);
services.push_back(pgmon);
+ services.push_back(logmon);
for (list<PaxosService*>::iterator p = services.begin();
p != services.end();
p++) {
class MDSMonitor;
class ClientMonitor;
class PGMonitor;
+class LogMonitor;
class MMonGetMap;
Paxos paxos_osdmap;
Paxos paxos_clientmap;
Paxos paxos_pgmap;
+ Paxos paxos_log;
friend class Paxos;
MDSMonitor *mdsmon;
ClientMonitor *clientmon;
PGMonitor *pgmon;
+ LogMonitor *logmon;
friend class OSDMonitor;
friend class MDSMonitor;
friend class ClientMonitor;
friend class PGMonitor;
+ friend class LogMonitor;
// messages
#define PAXOS_OSDMAP 2
#define PAXOS_CLIENTMAP 3
#define PAXOS_PGMAP 4
+#define PAXOS_LOG 5
inline const char *get_paxos_name(int p) {
switch (p) {
case PAXOS_OSDMAP: return "osdmap";
case PAXOS_CLIENTMAP: return "clientmap";
case PAXOS_PGMAP: return "pgmap";
+ case PAXOS_LOG: return "log";
default: assert(0); return 0;
}
}
#include "messages/MMonPaxos.h"
#include "messages/MMonElection.h"
+#include "messages/MLog.h"
#include "messages/MPing.h"
m = new MMonElection;
break;
+ case MSG_LOG:
+ m = new MLog;
+ break;
+
case CEPH_MSG_PING:
m = new MPing();
break;
#define MSG_MON_COMMAND 50
#define MSG_MON_COMMAND_ACK 51
+#define MSG_LOG 52
+
// osd internal
#define MSG_OSD_PING 70
#define MSG_OSD_BOOT 71