From dbe457f5d776a19a4bbdf1e87bf99a17801e43fd Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 8 May 2009 16:33:30 -0700 Subject: [PATCH] class: add a class monitor for the paxos services --- src/Makefile.am | 5 + src/ceph.cc | 28 ++++ src/include/ClassEntry.h | 118 +++++++++++++++++ src/include/rados.h | 3 +- src/messages/MClass.h | 53 ++++++++ src/messages/MClassAck.h | 44 +++++++ src/mon/ClassMonitor.cc | 272 +++++++++++++++++++++++++++++++++++++++ src/mon/ClassMonitor.h | 69 ++++++++++ src/mon/Monitor.cc | 2 + src/mon/mon_types.h | 4 +- src/msg/Message.h | 1 + 11 files changed, 597 insertions(+), 2 deletions(-) create mode 100644 src/include/ClassEntry.h create mode 100644 src/messages/MClass.h create mode 100644 src/messages/MClassAck.h create mode 100644 src/mon/ClassMonitor.cc create mode 100644 src/mon/ClassMonitor.h diff --git a/src/Makefile.am b/src/Makefile.am index 8d75b8f688725..fdd37eb5d9a5f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -232,6 +232,7 @@ libmon_a_SOURCES = \ mon/ClientMonitor.cc \ mon/PGMonitor.cc \ mon/LogMonitor.cc \ + mon/ClassMonitor.cc \ mon/Elector.cc \ mon/MonitorStore.cc @@ -329,6 +330,7 @@ noinst_HEADERS = \ include/Context.h\ include/Distribution.h\ include/LogEntry.h\ + include/ClassEntry.h\ include/assert.h\ include/atomic.h\ include/bitmapper.h\ @@ -449,6 +451,8 @@ noinst_HEADERS = \ mds/mdstypes.h\ mds/snap.h\ messages/MCacheExpire.h\ + messages/MClass.h\ + messages/MClassAck.h\ messages/MClientCaps.h\ messages/MClientCapRelease.h\ messages/MClientLease.h\ @@ -528,6 +532,7 @@ noinst_HEADERS = \ messages/MRemoveSnaps.h\ messages/MStatfs.h\ messages/MStatfsReply.h\ + mon/ClassMonitor.h\ mon/ClientMap.h\ mon/ClientMonitor.h\ mon/Elector.h\ diff --git a/src/ceph.cc b/src/ceph.cc index 17e9d4ca125a7..8c0fcecd894cc 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -71,6 +71,7 @@ Context *resend_event = 0; #include "osd/OSDMap.h" #include "mds/MDSMap.h" #include "include/LogEntry.h" +#include "include/ClassEntry.h" #include "mon/mon_types.h" @@ -176,6 +177,33 @@ void handle_notify(MMonObserveNotify *notify) } break; } + + case PAXOS_CLASS: + { + bufferlist::iterator p = notify->bl.begin(); + if (notify->is_latest) { + ClassList list; + ::decode(list, p); + // show the first class info + map::iterator iter = list.library_map.begin(); + if (iter != list.library_map.end()) { + dout(0) << " class " << iter->first << dendl; + } + } else { + ClassImpl impl; + impl.decode(p); + + dout(0) << " class " << impl << dendl; +#if 0 + ClassEntry le; + while (!p.end()) { + le.decode(p); + dout(0) << " class " << le << dendl; + } +#endif + } + break; + } } map_ver[notify->machine_id] = notify->ver; diff --git a/src/include/ClassEntry.h b/src/include/ClassEntry.h new file mode 100644 index 0000000000000..40ef2dacdd39a --- /dev/null +++ b/src/include/ClassEntry.h @@ -0,0 +1,118 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __CLASSENTRY_H +#define __CLASSENTRY_H + +#include "include/types.h" +#include "include/encoding.h" + +struct ClassImpl { + string name; + version_t version; + bufferlist binary; + utime_t stamp; + version_t seq; + + void encode(bufferlist& bl) const { + ::encode(name, bl); + ::encode(version, bl); + ::encode(seq, bl); + } + void decode(bufferlist::iterator& bl) { + ::decode(name, bl); + ::decode(version, bl); + ::decode(seq, bl); + } +}; + + +WRITE_CLASS_ENCODER(ClassImpl) + + +struct ClassLibrary { + string name; + version_t version; + + void encode(bufferlist& bl) const { + ::encode(name, bl); + ::encode(version, bl); + } + void decode(bufferlist::iterator& bl) { + ::decode(name, bl); + ::decode(version, bl); + } +}; + +WRITE_CLASS_ENCODER(ClassLibrary) + +struct ClassLibraryIncremental { + bool add; + bufferlist info; + bufferlist impl; + + void encode(bufferlist& bl) const { + ::encode(info, bl); + ::encode(impl, bl); + } + void decode(bufferlist::iterator& bl) { + ::decode(info, bl); + ::decode(impl, bl); + } + + void decode_impl(ClassImpl& i) { + ::decode(i, impl); + } +}; + +WRITE_CLASS_ENCODER(ClassLibraryIncremental) + +struct ClassList { + version_t version; + map library_map; + + ClassList() : version(0) {} + + void add(const string& name, const version_t version) { + ClassLibrary library; + library.version = version; + library_map[name] = library; + } + + bool contains(string& name) { + return (library_map.find(name) != library_map.end()); + } + + void encode(bufferlist& bl) const { + ::encode(version, bl); + ::encode(library_map, bl); + } + void decode(bufferlist::iterator& bl) { + ::decode(version, bl); + ::decode(library_map, bl); + } +}; +WRITE_CLASS_ENCODER(ClassList) + +inline ostream& operator<<(ostream& out, const ClassLibrary& e) +{ + return out << e.name << " (v" << e.version << ")"; +} + +inline ostream& operator<<(ostream& out, const ClassImpl& e) +{ + return out << e.name << " (v" << e.version << ")"; +} + +#endif diff --git a/src/include/rados.h b/src/include/rados.h index 2e7edaac59d69..ec09487892c1b 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -219,7 +219,8 @@ enum { CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6, /** exec **/ - CEPH_OSD_OP_EXEC = CEPH_OSD_OP_MODE_EXEC | 1, + CEPH_OSD_OP_LOAD_CLASS = CEPH_OSD_OP_MODE_EXEC | 1, + CEPH_OSD_OP_EXEC = CEPH_OSD_OP_MODE_EXEC | 2, }; static inline int ceph_osd_op_type_lock(int op) diff --git a/src/messages/MClass.h b/src/messages/MClass.h new file mode 100644 index 0000000000000..5516feb659798 --- /dev/null +++ b/src/messages/MClass.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MCLASS_H +#define __MCLASS_H + +#include "include/ClassEntry.h" + +class MClass : public Message { +public: + ceph_fsid_t fsid; + deque entries; + version_t last; + + MClass() : Message(MSG_CLASS) {} + MClass(ceph_fsid_t& f, deque& e) : Message(MSG_CLASS), fsid(f), entries(e), last(0) { } + MClass(ceph_fsid_t& f, version_t l) : Message(MSG_CLASS), fsid(f), last(l) {} + + const char *get_type_name() { return "class"; } + void print(ostream& out) { + out << "class("; + if (entries.size()) + out << entries.size() << " entries"; + if (last) + out << "last " << last; + out << ")"; + } + + void encode_payload() { + ::encode(fsid, payload); + ::encode(entries, payload); + ::encode(last, payload); + } + void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(fsid, p); + ::decode(entries, p); + ::decode(last, p); + } +}; + +#endif diff --git a/src/messages/MClassAck.h b/src/messages/MClassAck.h new file mode 100644 index 0000000000000..1b3bbe22026e7 --- /dev/null +++ b/src/messages/MClassAck.h @@ -0,0 +1,44 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MLOGACK_H +#define __MLOGACK_H + +#include "include/ClassEntry.h" + +class MClassAck : public Message { +public: + ceph_fsid_t fsid; + version_t last; + + MClassAck() : Message(MSG_LOGACK) {} + MClassAck(ceph_fsid_t& f, version_t l) : Message(MSG_LOGACK), fsid(f), last(l) {} + + const char *get_type_name() { return "class_ack"; } + void print(ostream& out) { + out << "class(last " << last << ")"; + } + + void encode_payload() { + ::encode(fsid, payload); + ::encode(last, payload); + } + void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(fsid, p); + ::decode(last, p); + } +}; + +#endif diff --git a/src/mon/ClassMonitor.cc b/src/mon/ClassMonitor.cc new file mode 100644 index 0000000000000..18488f230927f --- /dev/null +++ b/src/mon/ClassMonitor.cc @@ -0,0 +1,272 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "ClassMonitor.h" +#include "Monitor.h" +#include "MonitorStore.h" + +#include "messages/MMonCommand.h" +#include "messages/MClass.h" +#include "messages/MClassAck.h" + +#include "common/Timer.h" + +#include "osd/osd_types.h" +#include "osd/PG.h" // yuck + +#include "config.h" +#include + +#define DOUT_SUBSYS mon +#undef dout_prefix +#define dout_prefix _prefix(mon, paxos->get_version()) +static ostream& _prefix(Monitor *mon, version_t v) { + return *_dout << dbeginl + << "mon" << mon->whoami + << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) + << ".class v" << v << " "; +} + +ostream& operator<<(ostream& out, ClassMonitor& pm) +{ + std::stringstream ss; + + return out << "class"; +} + +/* + Tick function to update the map based on performance every N seconds +*/ + +void ClassMonitor::tick() +{ + if (!paxos->is_active()) return; + + update_from_paxos(); + dout(10) << *this << dendl; + + if (!mon->is_leader()) return; + +} + +void ClassMonitor::create_initial(bufferlist& bl) +{ + dout(10) << "create_initial -- creating initial map" << dendl; + ClassImpl i; + i.name = "test"; + i.version = 0; + i.seq = 0; + i.stamp = g_clock.now(); + ClassLibraryIncremental inc; + ::encode(i, inc.impl); + inc.add = false; +#if 0 + pending_class.insert(pair(e.stamp, e)); + e.name = "test2"; + e.version = 12; + e.seq = 1; +#endif + pending_class.insert(pair(i.stamp, inc)); +} + +bool ClassMonitor::update_from_paxos() +{ + version_t paxosv = paxos->get_version(); + + if (paxosv == list.version) return true; + assert(paxosv >= list.version); + + dout(0) << "ClassMonitor::update_from_paxos() paxosv=" << paxosv << " list.version=" << list.version << dendl; + + bufferlist blog; + + if (list.version == 0 && paxosv > 1) { + // startup: just load latest full map + bufferlist latest; + version_t v = paxos->get_latest(latest); + if (v) { + dout(7) << "update_from_paxos startup: loading summary e" << v << dendl; + bufferlist::iterator p = latest.begin(); + ::decode(list, p); + } + } + + // walk through incrementals + while (paxosv > list.version) { + bufferlist bl; + bool success = paxos->read(list.version+1, bl); + assert(success); + + bufferlist::iterator p = bl.begin(); + ClassLibraryIncremental inc; + inc.decode(p); + if (inc.add) { + ClassImpl impl; + bufferlist::iterator impl_iter = inc.impl.begin(); + impl.decode(impl_iter); + mon->store->put_bl_ss(inc.impl, "class_impl", impl.name.c_str()); + } else { + /* this is a removal */ + } + + list.version++; + } + + bufferlist bl; + ::encode(list, bl); + paxos->stash_latest(paxosv, bl); + + return true; +} + +void ClassMonitor::create_pending() +{ + pending_class.clear(); + pending_list = list; + dout(10) << "create_pending v " << (paxos->get_version() + 1) << dendl; +} + +void ClassMonitor::encode_pending(bufferlist &bl) +{ + dout(10) << "encode_pending v " << (paxos->get_version() + 1) << dendl; + for (multimap::iterator p = pending_class.begin(); + p != pending_class.end(); + p++) + p->second.encode(bl); +} + +bool ClassMonitor::preprocess_query(Message *m) +{ + dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; + switch (m->get_type()) { + case MSG_MON_COMMAND: + return preprocess_command((MMonCommand*)m); + + case MSG_LOG: + return preprocess_class((MClass*)m); + + default: + assert(0); + delete m; + return true; + } +} + +bool ClassMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; + switch (m->get_type()) { + case MSG_MON_COMMAND: + return prepare_command((MMonCommand*)m); + case MSG_CLASS: + return prepare_class((MClass*)m); + default: + assert(0); + delete m; + return false; + } +} + +void ClassMonitor::committed() +{ + +} + +bool ClassMonitor::preprocess_class(MClass *m) +{ + dout(10) << "preprocess_class " << *m << " from " << m->get_orig_source() << dendl; + + int num_new = 0; + for (deque::iterator p = m->entries.begin(); + p != m->entries.end(); + p++) { + ClassImpl impl; + p->decode_impl(impl); + if (!pending_list.contains(impl.name)) + num_new++; + } + if (!num_new) { + dout(10) << " nothing new" << dendl; + return true; + } + return false; +} + +bool ClassMonitor::prepare_class(MClass *m) +{ + dout(10) << "prepare_class " << *m << " from " << m->get_orig_source() << dendl; + + if (ceph_fsid_compare(&m->fsid, &mon->monmap->fsid)) { + dout(0) << "handle_class on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl; + delete m; + return false; + } + + for (deque::iterator p = m->entries.begin(); + p != m->entries.end(); + p++) { + ClassImpl impl; + p->decode_impl(impl); + dout(10) << " writing class " << impl << dendl; + if (!pending_list.contains(impl.name)) { + pending_list.add(impl.name, impl.version); + pending_class.insert(pair(impl.stamp, *p)); + } + } + + paxos->wait_for_commit(new C_Class(this, m, m->get_orig_source_inst())); + return true; +} + +void ClassMonitor::_updated_class(MClass *m, entity_inst_t who) +{ + dout(7) << "_updated_class for " << who << dendl; + ClassImpl impl; + m->entries.rbegin()->decode_impl(impl); + mon->messenger->send_message(new MClassAck(m->fsid, impl.seq), who); + delete m; +} + + + +bool ClassMonitor::preprocess_command(MMonCommand *m) +{ + int r = -1; + bufferlist rdata; + stringstream ss; + + if (r != -1) { + string rs; + getline(ss, rs); + mon->reply_command(m, r, rs, rdata); + return true; + } else + return false; +} + + +bool ClassMonitor::prepare_command(MMonCommand *m) +{ + stringstream ss; + string rs; + int err = -EINVAL; + + // nothing here yet + ss << "unrecognized command"; + + getline(ss, rs); + mon->reply_command(m, err, rs); + return false; +} diff --git a/src/mon/ClassMonitor.h b/src/mon/ClassMonitor.h new file mode 100644 index 0000000000000..f9b972d213eaf --- /dev/null +++ b/src/mon/ClassMonitor.h @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __CLASSMONITOR_H +#define __CLASSMONITOR_H + +#include +#include +using namespace std; + +#include "include/types.h" +#include "msg/Messenger.h" +#include "PaxosService.h" + +#include "include/ClassEntry.h" + +class MMonCommand; +class MClass; + +class ClassMonitor : public PaxosService { +private: + multimap pending_class; + ClassList pending_list, list; + + void create_initial(bufferlist& bl); + bool update_from_paxos(); + void create_pending(); // prepare a new pending + void encode_pending(bufferlist &bl); // propose pending update to peers + + void committed(); + + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); + + bool preprocess_class(MClass *m); + bool prepare_class(MClass *m); + void _updated_class(MClass *m, entity_inst_t who); + + struct C_Class : public Context { + ClassMonitor *classmon; + MClass *ack; + entity_inst_t who; + C_Class(ClassMonitor *p, MClass *a, entity_inst_t w) : classmon(p), ack(a), who(w) {} + void finish(int r) { + classmon->_updated_class(ack, who); + } + }; + + bool preprocess_command(MMonCommand *m); + bool prepare_command(MMonCommand *m); + + public: + ClassMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } + + void tick(); // check state, take actions +}; + +#endif diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index dadded02c05e3..dd2bb4d6cd4ea 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -41,6 +41,7 @@ #include "ClientMonitor.h" #include "PGMonitor.h" #include "LogMonitor.h" +#include "ClassMonitor.h" #include "osd/OSDMap.h" @@ -84,6 +85,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map) : paxos_service[PAXOS_CLIENTMAP] = new ClientMonitor(this, add_paxos(PAXOS_CLIENTMAP)); paxos_service[PAXOS_PGMAP] = new PGMonitor(this, add_paxos(PAXOS_PGMAP)); paxos_service[PAXOS_LOG] = new LogMonitor(this, add_paxos(PAXOS_LOG)); + paxos_service[PAXOS_CLASS] = new ClassMonitor(this, add_paxos(PAXOS_CLASS)); } Paxos *Monitor::add_paxos(int type) diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index 93e22695e9388..3fde8b3b3aa16 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -20,7 +20,8 @@ #define PAXOS_OSDMAP 2 #define PAXOS_CLIENTMAP 3 #define PAXOS_LOG 4 -#define PAXOS_NUM 5 +#define PAXOS_CLASS 5 +#define PAXOS_NUM 6 inline const char *get_paxos_name(int p) { switch (p) { @@ -29,6 +30,7 @@ inline const char *get_paxos_name(int p) { case PAXOS_CLIENTMAP: return "clientmap"; case PAXOS_PGMAP: return "pgmap"; case PAXOS_LOG: return "logm"; + case PAXOS_CLASS: return "class"; default: assert(0); return 0; } } diff --git a/src/msg/Message.h b/src/msg/Message.h index 4d584b1269330..c0c6fe555f790 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -29,6 +29,7 @@ #define MSG_LOGACK 53 #define MSG_MON_OBSERVE 54 #define MSG_MON_OBSERVE_NOTIFY 55 +#define MSG_CLASS 56 // osd internal #define MSG_OSD_PING 70 -- 2.39.5