From 651ff7495a060b8851b0e23fc8793857e4cada90 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 11 May 2009 15:38:03 -0700 Subject: [PATCH] class: partial implementation of class osd mon messages --- src/messages/MClass.h | 50 ++++++++++++++++++++++++++++++---------- src/messages/MClassAck.h | 8 +++---- src/mon/ClassMonitor.cc | 35 +++++++++++++--------------- src/mon/OSDMonitor.cc | 9 +++++++- src/mon/OSDMonitor.h | 3 +++ src/msg/Message.cc | 6 +++++ src/osd/OSD.cc | 14 +++++++++++ src/osd/OSD.h | 2 ++ src/osd/ReplicatedPG.cc | 3 +++ 9 files changed, 94 insertions(+), 36 deletions(-) diff --git a/src/messages/MClass.h b/src/messages/MClass.h index 024ca1cb3bf76..cbc5218b0e869 100644 --- a/src/messages/MClass.h +++ b/src/messages/MClass.h @@ -17,29 +17,51 @@ #include "include/ClassEntry.h" +enum { + CLASS_NOOP = 0, + CLASS_GET, + CLASS_SET, + CLASS_RESPONSE, +}; + class MClass : public Message { public: ceph_fsid_t fsid; - deque entries; + deque info; + deque impl; + deque add; version_t last; __s32 action; - enum { - CLASS_NOOP = 0, - CLASS_ADD, - CLASS_REMOVE, - CLASS_GET, - }; - + MClass() : Message(MSG_CLASS) {} +#if 0 MClass(ceph_fsid_t& f, deque& e) : Message(MSG_CLASS), fsid(f), entries(e), last(0), action(0) { } +#endif MClass(ceph_fsid_t& f, version_t l) : Message(MSG_CLASS), fsid(f), last(l) {} const char *get_type_name() { return "class"; } void print(ostream& out) { out << "class("; - if (entries.size()) - out << entries.size() << " entries"; + switch (action) { + case CLASS_NOOP: + out << "NOOP, "; + break; + case CLASS_GET: + out << "GET, "; + break; + case CLASS_SET: + out << "SET, "; + break; + case CLASS_RESPONSE: + out << "SET, "; + break; + default: + out << "Unknown op, "; + break; + } + if (info.size()) + out << info.size() << " entries"; if (last) out << "last " << last; out << ")"; @@ -47,14 +69,18 @@ public: void encode_payload() { ::encode(fsid, payload); - ::encode(entries, payload); + ::encode(info, payload); + ::encode(impl, payload); + ::encode(add, payload); ::encode(last, payload); ::encode(action, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(fsid, p); - ::decode(entries, p); + ::decode(info, payload); + ::decode(impl, payload); + ::decode(add, payload); ::decode(last, p); ::decode(action, payload); } diff --git a/src/messages/MClassAck.h b/src/messages/MClassAck.h index 1b3bbe22026e7..29754a754b56a 100644 --- a/src/messages/MClassAck.h +++ b/src/messages/MClassAck.h @@ -12,8 +12,8 @@ * */ -#ifndef __MLOGACK_H -#define __MLOGACK_H +#ifndef __MCLASSACK_H +#define __MCLASSACK_H #include "include/ClassEntry.h" @@ -22,8 +22,8 @@ public: ceph_fsid_t fsid; version_t last; - MClassAck() : Message(MSG_LOGACK) {} - MClassAck(ceph_fsid_t& f, version_t l) : Message(MSG_LOGACK), fsid(f), last(l) {} + MClassAck() : Message(MSG_CLASS_ACK) {} + MClassAck(ceph_fsid_t& f, version_t l) : Message(MSG_CLASS_ACK), fsid(f), last(l) {} const char *get_type_name() { return "class_ack"; } void print(ostream& out) { diff --git a/src/mon/ClassMonitor.cc b/src/mon/ClassMonitor.cc index f1da099ae2365..cf031b7ff3ea2 100644 --- a/src/mon/ClassMonitor.cc +++ b/src/mon/ClassMonitor.cc @@ -202,12 +202,10 @@ bool ClassMonitor::preprocess_class(MClass *m) dout(10) << "preprocess_class " << *m << " from " << m->get_orig_source() << dendl; int num_new = 0; - for (deque::iterator p = m->entries.begin(); - p != m->entries.end(); + for (deque::iterator p = m->info.begin(); + p != m->info.end(); p++) { - ClassLibrary info; - p->decode_info(info); - if (!pending_list.contains(info.name)) + if (!pending_list.contains((*p).name)) num_new++; } if (!num_new) { @@ -226,18 +224,18 @@ bool ClassMonitor::prepare_class(MClass *m) delete m; return false; } - - for (deque::iterator p = m->entries.begin(); - p != m->entries.end(); - p++) { - ClassLibrary info; - ClassImpl impl; - p->decode_info(info); - p->decode_impl(impl); - dout(10) << " writing class " << info << dendl; - if (!pending_list.contains(info.name)) { - pending_list.add(info); - pending_class.insert(pair(impl.stamp, *p)); + deque::iterator impl_iter = m->impl.begin(); + + for (deque::iterator p = m->info.begin(); + p != m->info.end(); + p++, impl_iter++) { + dout(10) << " writing class " << *p << dendl; + if (!pending_list.contains((*p).name)) { + ClassLibraryIncremental inc; + ::encode(*p, inc.info); + ::encode(*impl_iter, inc.impl); + pending_list.add(*p); + pending_class.insert(pair((*impl_iter).stamp, inc)); } } @@ -248,8 +246,7 @@ bool ClassMonitor::prepare_class(MClass *m) void ClassMonitor::_updated_class(MClass *m, entity_inst_t who) { dout(7) << "_updated_class for " << who << dendl; - ClassImpl impl; - m->entries.rbegin()->decode_impl(impl); + ClassImpl impl = *(m->impl.rbegin()); mon->messenger->send_message(new MClassAck(m->fsid, impl.seq), who); delete m; } diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 8092afcb40c36..9f681b3aff365 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -29,6 +29,7 @@ #include "messages/MMonCommand.h" #include "messages/MRemoveSnaps.h" #include "messages/MOSDScrub.h" +#include "messages/MClass.h" #include "common/Timer.h" @@ -274,6 +275,9 @@ bool OSDMonitor::preprocess_query(Message *m) case MSG_REMOVE_SNAPS: return preprocess_remove_snaps((MRemoveSnaps*)m); + case MSG_CLASS: + handle_class((MClass*)m); + return true; default: assert(0); delete m; @@ -361,7 +365,10 @@ void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) delete m; } - +void OSDMonitor::handle_class(MClass *m) +{ + dout(0) << "OSDMonitor::handle_class" << dendl; +} // --------------------------- // UPDATEs diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 2a93bda44b5ea..8a8d77d9298f5 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -32,6 +32,7 @@ using namespace std; class Monitor; class MOSDBoot; class MMonCommand; +class MClass; class OSDMonitor : public PaxosService { public: @@ -71,6 +72,8 @@ private: void handle_osd_getmap(class MOSDGetMap *m); + void handle_class(class MClass *m); + bool preprocess_failure(class MOSDFailure *m); bool prepare_failure(class MOSDFailure *m); void _reported_failure(MOSDFailure *m); diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 6c886b8707f91..55820a9a3ee4a 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -111,6 +111,8 @@ using namespace std; #include "messages/MLock.h" +#include "messages/MClass.h" + #include "config.h" #define DEBUGLVL 10 // debug level of output @@ -432,6 +434,10 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer, m = new MGenericMessage(type); break; + case MSG_CLASS: + m = new MClass(); + break; + default: dout(0) << "can't decode unknown message type " << type << dendl; assert(0); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index bd1ef34446270..b15b5b2299124 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -67,6 +67,8 @@ #include "messages/MPGStats.h" #include "messages/MPGStatsAck.h" +#include "messages/MClass.h" + #include "common/Logger.h" #include "common/LogType.h" #include "common/Timer.h" @@ -3676,4 +3678,16 @@ void OSD::wait_for_no_ops() } +void OSD::get_class(const char *name) +{ + MClass *m = new MClass(osdmap->get_fsid(), 0); + ClassLibrary info; + info.name = name; + m->info.push_back(info); + m->action = CLASS_GET; + int mon = monmap->pick_mon(); + dout(0) << "sending class message " << *m << " to mon" << mon << dendl; + messenger->send_message(m, + monmap->get_inst(mon)); +} diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 67353e5c8fe57..59e77f286958c 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -762,6 +762,8 @@ private: void force_remount(); LogClient *get_logclient() { return &logclient; } + + void get_class(const char *name); }; #endif diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 138ee8425b27c..a33b3acc42b58 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -741,6 +741,8 @@ void ReplicatedPG::op_read(MOSDOp *op) case CEPH_OSD_OP_EXEC: { + dout(0) << "CEPH_OSD_OP_EXEC" << dendl; + osd->get_class("test"); bufferlist bl; int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl); @@ -750,6 +752,7 @@ void ReplicatedPG::op_read(MOSDOp *op) if (r >= 0) { p->length = r; char *buf = bl.c_str(); + for (unsigned int i=0; i