]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/msg: PThey mostly hold version_t's now. Unused, though.
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 23 Jun 2009 21:03:34 +0000 (14:03 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 24 Jun 2009 19:27:55 +0000 (12:27 -0700)
36 files changed:
src/Makefile.am
src/messages/MClass.h
src/messages/MClassAck.h
src/messages/MGetPoolStats.h
src/messages/MGetPoolStatsReply.h
src/messages/MLog.h
src/messages/MLogAck.h
src/messages/MMDSBeacon.h
src/messages/MMonCommand.h
src/messages/MMonCommandAck.h
src/messages/MMonObserve.h
src/messages/MMonObserveNotify.h
src/messages/MOSDBoot.h
src/messages/MPoolSnap.h
src/messages/MPoolSnapReply.h
src/mon/ClassMonitor.cc
src/mon/ClassMonitor.h
src/mon/ClientMonitor.cc
src/mon/ClientMonitor.h
src/mon/LogMonitor.cc
src/mon/LogMonitor.h
src/mon/MDSMonitor.cc
src/mon/MDSMonitor.h
src/mon/Monitor.cc
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/mon/PGMonitor.cc
src/mon/PGMonitor.h
src/mon/Paxos.cc
src/mon/Paxos.h
src/mon/PaxosService.cc
src/mon/PaxosService.h
src/msg/Message.cc
src/msg/Message.h
src/osd/ReplicatedPG.cc
src/osdc/Objecter.cc

index 7ddb77e149dbaa00061496bec86c1b695dd77cfb..eb8b0483ccf0c5428f0cfa045083e88914c0a651 100644 (file)
@@ -580,6 +580,7 @@ noinst_HEADERS = \
         messages/MRemoveSnaps.h\
         messages/MStatfs.h\
         messages/MStatfsReply.h\
+       messages/PaxosServiceMessage.h\
         mon/ClassMonitor.h\
         mon/ClientMap.h\
         mon/ClientMonitor.h\
index d9847a599c279e8e5b1bac49b57fb9ef16fd5f51..01e87ad59a2d6f317dbb5b8224ba625e42c0bff3 100644 (file)
@@ -24,7 +24,7 @@ enum {
    CLASS_RESPONSE,
 };
 
-class MClass : public Message {
+class MClass : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   deque<ClassInfo> info;
@@ -34,11 +34,13 @@ public:
   __s32 action;
 
 
-  MClass() : Message(MSG_CLASS) {}
+  MClass() : PaxosServiceMessage(MSG_CLASS, 0) {}
 #if 0
-  MClass(ceph_fsid_t& f, deque<ClassLibraryIncremental>& e) : Message(MSG_CLASS), fsid(f), entries(e), last(0), action(0) { }
+  MClass(ceph_fsid_t& f, deque<ClassLibraryIncremental>& e) :
+    PaxosServiceMessage(MSG_CLASS, 0),
+    fsid(f), entries(e), last(0), action(0) {}
 #endif
-  MClass(ceph_fsid_t& f, version_t l) : Message(MSG_CLASS), fsid(f), last(l) {}
+  MClass(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_CLASS, l), fsid(f), last(l) {}
 
   const char *get_type_name() { return "class"; }
   void print(ostream& out) {
@@ -68,6 +70,7 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(info, payload);
     ::encode(impl, payload);
@@ -77,6 +80,7 @@ public:
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(info, p);
     ::decode(impl, p);
index e513a7329b73864542cbfcb6016498dcc142d2c7..2d8df36a18d3e1638d86fe53df354100cc91beaa 100644 (file)
 
 #include "include/ClassLibrary.h"
 
-class MClassAck : public Message {
+class MClassAck : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   version_t last;
   
-  MClassAck() : Message(MSG_CLASS_ACK) {}
-  MClassAck(ceph_fsid_t& f, version_t l) : Message(MSG_CLASS_ACK), fsid(f), last(l) {}
+  MClassAck() : PaxosServiceMessage(MSG_CLASS_ACK, 0) {}
+  MClassAck(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_CLASS_ACK, l), fsid(f), last(l) {}
 
   const char *get_type_name() { return "class_ack"; }
   void print(ostream& out) {
@@ -31,11 +31,13 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(last, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(last, p);
   }
index a1ccf2fc26f85e7fdd222b5b54de1acd3ef720c6..5365416b14349339a01a4eed0eb9a7f3752522f5 100644 (file)
 #ifndef __MGETPOOLSTATS_H
 #define __MGETPOOLSTATS_H
 
-class MGetPoolStats : public Message {
+#include "messages/PaxosServiceMessage.h"
+
+class MGetPoolStats : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   tid_t tid;
   vector<string> pools;
 
-  MGetPoolStats() : Message(MSG_GETPOOLSTATS) {}
-  MGetPoolStats(ceph_fsid_t& f, tid_t t, vector<string>& ls) :
-    Message(MSG_GETPOOLSTATS),
+  MGetPoolStats() : PaxosServiceMessage(MSG_GETPOOLSTATS, 0) {}
+  MGetPoolStats(ceph_fsid_t& f, tid_t t, vector<string>& ls, version_t l) :
+    PaxosServiceMessage(MSG_GETPOOLSTATS, l),
     fsid(f), tid(t), pools(ls) { }
 
   const char *get_type_name() { return "getpoolstats"; }
@@ -33,12 +35,14 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(tid, payload);
     ::encode(pools, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(tid, p);
     ::decode(pools, p);
index 0126fd15c134a0d24f3ef0cc44a267815b8a128e..1dbbbebecdcc1fcce741479dc9d3e975b2625a5f 100644 (file)
 #ifndef __MGETPOOLSTATSREPLY_H
 #define __MGETPOOLSTATSREPLY_H
 
-class MGetPoolStatsReply : public Message {
+class MGetPoolStatsReply : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   tid_t tid;
   map<string,pool_stat_t> pool_stats;
 
-  MGetPoolStatsReply() : Message(MSG_GETPOOLSTATSREPLY) {}
-  MGetPoolStatsReply(ceph_fsid_t& f, tid_t t) :
-    Message(MSG_GETPOOLSTATSREPLY),
+  MGetPoolStatsReply() : PaxosServiceMessage(MSG_GETPOOLSTATSREPLY, 0) {}
+  MGetPoolStatsReply(ceph_fsid_t& f, tid_t t, version_t v) :
+    PaxosServiceMessage(MSG_GETPOOLSTATSREPLY, v),
     fsid(f), tid(t) { }
 
   const char *get_type_name() { return "getpoolstats"; }
@@ -33,12 +33,14 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(tid, payload);
     ::encode(pool_stats, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(tid, p);
     ::decode(pool_stats, p);
index de94845b9bf80c626ae4cda2f25118699cc3a588..5ffe7752ed8aec88fa58b4b1b56996dee5d92996 100644 (file)
 
 #include "include/LogEntry.h"
 
-class MLog : public Message {
+class MLog : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   deque<LogEntry> entries;
   version_t last;
   
-  MLog() : Message(MSG_LOG) {}
-  MLog(ceph_fsid_t& f, deque<LogEntry>& e) : Message(MSG_LOG), fsid(f), entries(e), last(0) { }
-  MLog(ceph_fsid_t& f, version_t l) : Message(MSG_LOG), fsid(f), last(l) {}
+  MLog() : PaxosServiceMessage(MSG_LOG, 0) {}
+  MLog(ceph_fsid_t& f, deque<LogEntry>& e) : PaxosServiceMessage(MSG_LOG, 0), fsid(f), entries(e), last(0) { }
+  MLog(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_LOG, l), fsid(f), last(l) {}
 
   const char *get_type_name() { return "log"; }
   void print(ostream& out) {
@@ -38,12 +38,14 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(entries, payload);
     ::encode(last, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(entries, p);
     ::decode(last, p);
index f4d4286e0728ad7b35bbd392941fd3abccd4ca75..ae188ff6893a67763cd2653370ae78ac70a6d206 100644 (file)
 #define __MLOGACK_H
 
 #include "include/LogEntry.h"
+#include "messages/PaxosServiceMessage.h"
 
-class MLogAck : public Message {
+class MLogAck : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   version_t last;
   
-  MLogAck() : Message(MSG_LOGACK) {}
-  MLogAck(ceph_fsid_t& f, version_t l) : Message(MSG_LOGACK), fsid(f), last(l) {}
+  MLogAck() : PaxosServiceMessage(MSG_LOGACK, 0) {}
+  MLogAck(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_LOGACK, l), fsid(f), last(l) {}
 
   const char *get_type_name() { return "log_ack"; }
   void print(ostream& out) {
@@ -31,11 +32,13 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(last, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(last, p);
   }
index ebde2fa8080e6fdfa68395aa04a3bf3da3fec9ce..ce86c1ed8d87ad5e6fb2de260b62cb06fc4faeca 100644 (file)
 #ifndef __MMDSBEACON_H
 #define __MMDSBEACON_H
 
-#include "msg/Message.h"
+#include "messages/PaxosServiceMessage.h"
 
 #include "include/types.h"
 
 #include "mds/MDSMap.h"
 
-class MMDSBeacon : public Message {
+class MMDSBeacon : public PaxosServiceMessage {
   ceph_fsid_t fsid;
   string name;
   epoch_t last_epoch_seen;  // include last mdsmap epoch mds has seen to avoid race with monitor decree
@@ -31,9 +31,9 @@ class MMDSBeacon : public Message {
   string standby_for_name;
 
  public:
-  MMDSBeacon() : Message(MSG_MDS_BEACON) {}
+  MMDSBeacon() : PaxosServiceMessage(MSG_MDS_BEACON, 0) {}
   MMDSBeacon(ceph_fsid_t &f, string& n, epoch_t les, int st, version_t se) : 
-    Message(MSG_MDS_BEACON), 
+    PaxosServiceMessage(MSG_MDS_BEACON, se), 
     fsid(f), name(n), last_epoch_seen(les), state(st), seq(se),
     standby_for_rank(-1) { }
 
@@ -55,6 +55,7 @@ class MMDSBeacon : public Message {
   }
   
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(last_epoch_seen, payload);
     ::encode(state, payload);
@@ -65,6 +66,7 @@ class MMDSBeacon : public Message {
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(last_epoch_seen, p);
     ::decode(state, p);
index 4d7634c5906a902df7895e8e98c0d5b9fdbb1d01..2a2fc9660b946a3df2b8629ba83332e8d9d0aff9 100644 (file)
 #ifndef __MMONCOMMAND_H
 #define __MMONCOMMAND_H
 
-#include "msg/Message.h"
+#include "messages/PaxosServiceMessage.h"
 
 #include <vector>
 using std::vector;
 
-class MMonCommand : public Message {
+class MMonCommand : public PaxosServiceMessage {
  public:
   ceph_fsid_t fsid;
   vector<string> cmd;
 
-  MMonCommand() : Message(MSG_MON_COMMAND) {}
-  MMonCommand(ceph_fsid_t &f) : 
-    Message(MSG_MON_COMMAND),
+  MMonCommand() : PaxosServiceMessage(MSG_MON_COMMAND, 0) {}
+  MMonCommand(ceph_fsid_t &f, version_t version) : 
+    PaxosServiceMessage(MSG_MON_COMMAND, version),
     fsid(f) { }
   
   const char *get_type_name() { return "mon_command"; }
@@ -41,11 +41,13 @@ class MMonCommand : public Message {
   }
   
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(cmd, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(cmd, p);
   }
index cf3f95c84f0e8530fc6e344e38d576291fa5641c..6c0081133c832b12c8bed3ddbe95910293aacab5 100644 (file)
 
 #include "msg/Message.h"
 
-class MMonCommandAck : public Message {
+class MMonCommandAck : public PaxosServiceMessage {
  public:
   vector<string> cmd;
   __s32 r;
   string rs;
   
-  MMonCommandAck() : Message(MSG_MON_COMMAND_ACK) {}
-  MMonCommandAck(vector<string>& c, int _r, string s) : 
-    Message(MSG_MON_COMMAND_ACK),
+  MMonCommandAck() : PaxosServiceMessage(MSG_MON_COMMAND_ACK, 0) {}
+  MMonCommandAck(vector<string>& c, int _r, string s, version_t v) : 
+    PaxosServiceMessage(MSG_MON_COMMAND_ACK, v),
     cmd(c), r(_r), rs(s) { }
   
   const char *get_type_name() { return "mon_command"; }
@@ -34,12 +34,14 @@ class MMonCommandAck : public Message {
   }
   
   void encode_payload() {
+    paxos_encode();
     ::encode(r, payload);
     ::encode(rs, payload);
     ::encode(cmd, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(r, p);
     ::decode(rs, p);
     ::decode(cmd, p);
index 2cff2e68a12b651f160c47c5276a198f3cc7d711..2a8fca2b4bb7accb5c4c1fb5bf3cd4f6af80c837 100644 (file)
 #include <vector>
 using std::vector;
 
-class MMonObserve : public Message {
+class MMonObserve : public PaxosServiceMessage {
  public:
   ceph_fsid_t fsid;
   uint32_t machine_id;
   version_t ver;
 
-  MMonObserve() : Message(MSG_MON_OBSERVE) {}
+  MMonObserve() : PaxosServiceMessage(MSG_MON_OBSERVE, 0) {}
   MMonObserve(ceph_fsid_t &f, int mid, version_t v) : 
-    Message(MSG_MON_OBSERVE),
+    PaxosServiceMessage(MSG_MON_OBSERVE, v),
     fsid(f), machine_id(mid), ver(v) { }
   
   const char *get_type_name() { return "mon_observe"; }
@@ -37,12 +37,14 @@ class MMonObserve : public Message {
   }
   
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(machine_id, payload);
     ::encode(ver, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(machine_id, p);
     ::decode(ver, p);
index df732b06fce1a95b03b1c28981e1a0b8edfcb45c..1300b85fe879ac8f0c5ce9f8124ac81055fa05dd 100644 (file)
@@ -17,7 +17,7 @@
 
 #include "msg/Message.h"
 
-class MMonObserveNotify : public Message {
+class MMonObserveNotify : public PaxosServiceMessage {
  public:
   ceph_fsid_t fsid;
   int32_t machine_id;
@@ -25,9 +25,9 @@ class MMonObserveNotify : public Message {
   version_t ver;
   bool is_latest;
   
-  MMonObserveNotify() : Message(MSG_MON_OBSERVE_NOTIFY) {}
+  MMonObserveNotify() : PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, 0) {}
   MMonObserveNotify(ceph_fsid_t& f, int id, bufferlist& b, version_t v, bool l) :
-    Message(MSG_MON_OBSERVE_NOTIFY), fsid(f), machine_id(id), bl(b), ver(v), is_latest(l) {}
+    PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, v), fsid(f), machine_id(id), bl(b), ver(v), is_latest(l) {}
     
   
   const char *get_type_name() { return "mon_observe_notify"; }
@@ -39,6 +39,7 @@ class MMonObserveNotify : public Message {
   }
   
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(machine_id, payload);
     ::encode(bl, payload);
@@ -47,6 +48,7 @@ class MMonObserveNotify : public Message {
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(machine_id, p);
     ::decode(bl, p);
index 90d30bfe6e7356a9e4265240ec1c55b919855ff3..747a78f53c60877bb7ca804d91d55deb79c1802e 100644 (file)
 #include "include/types.h"
 #include "osd/osd_types.h"
 
-class MOSDBoot : public Message {
+class MOSDBoot : public PaxosServiceMessage {
  public:
   OSDSuperblock sb;
 
-  MOSDBoot() {}
-  MOSDBoot(OSDSuperblock& s) : 
-    Message(MSG_OSD_BOOT),
-    sb(s) {
+  MOSDBoot() : PaxosServiceMessage(){}
+  MOSDBoot(OSDSuperblock& s, version_t v) : 
+    PaxosServiceMessage(MSG_OSD_BOOT, v), sb(s) {
   }
 
   const char *get_type_name() { return "osd_boot"; }
index 33aa7799671fed0507556c0db9b16dcf2e9ec9b9..7bb571596e85b5117be21959da5cf2c5e76325d3 100644 (file)
 #ifndef __MPOOLSNAP_H
 #define __MPOOLSNAP_H
 
+#include "messages/PaxosServiceMessage.h"
 
-class MPoolSnap : public Message {
+
+class MPoolSnap : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   tid_t tid;
@@ -24,9 +26,9 @@ public:
   string name;
   bool create;
 
-  MPoolSnap() : Message(MSG_POOLSNAP) {}
-  MPoolSnap( ceph_fsid_t& f, tid_t t, int p, string& n, bool c) :
-    Message(MSG_POOLSNAP), fsid(f), tid(t), pool(p), name(n), create(c) {}
+  MPoolSnap() : PaxosServiceMessage(MSG_POOLSNAP, 0) {}
+  MPoolSnap( ceph_fsid_t& f, tid_t t, int p, string& n, bool c, version_t v) :
+    PaxosServiceMessage(MSG_POOLSNAP, v), fsid(f), tid(t), pool(p), name(n), create(c) {}
 
   const char *get_type_name() { return "poolsnap"; }
   void print(ostream& out) {
@@ -34,6 +36,7 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(tid, payload);
     ::encode(pool, payload);
@@ -42,6 +45,7 @@ public:
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(tid, p);
     ::decode(pool, p);
index e4b6360afc76879c463488ec625c54fec893968b..42422f7b01caf30bd2cdfff3cd62219b81d23eba 100644 (file)
@@ -16,7 +16,7 @@
 #define __MPOOLSNAPREPLY_H
 
 
-class MPoolSnapReply : public Message {
+class MPoolSnapReply : public PaxosServiceMessage {
 public:
   ceph_fsid_t fsid;
   tid_t tid;
@@ -24,9 +24,9 @@ public:
   epoch_t epoch;
 
 
-  MPoolSnapReply() : Message(MSG_POOLSNAPREPLY) {}
-  MPoolSnapReply( ceph_fsid_t& f, tid_t t, int rc, int e) :
-    Message(MSG_POOLSNAPREPLY), fsid(f), tid(t), replyCode(rc), epoch(e) {}
+  MPoolSnapReply() : PaxosServiceMessage(MSG_POOLSNAPREPLY, 0) {}
+  MPoolSnapReply( ceph_fsid_t& f, tid_t t, int rc, int e, version_t v) :
+    PaxosServiceMessage(MSG_POOLSNAPREPLY, v), fsid(f), tid(t), replyCode(rc), epoch(e) {}
 
   const char *get_type_name() { return "poolsnapreply"; }
 
@@ -35,6 +35,7 @@ public:
   }
 
   void encode_payload() {
+    paxos_encode();
     ::encode(fsid, payload);
     ::encode(tid, payload);
     ::encode(replyCode, payload);
@@ -42,6 +43,7 @@ public:
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
     ::decode(fsid, p);
     ::decode(tid, p);
     ::decode(replyCode, p);
index 2e23d24398906b3a344c17c64ed8d329e895a03e..ee6108394b5986d897a1de690b155b4e01be9155 100644 (file)
@@ -179,7 +179,7 @@ void ClassMonitor::encode_pending(bufferlist &bl)
     p->second.encode(bl);
 }
 
-bool ClassMonitor::preprocess_query(Message *m)
+bool ClassMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
   switch (m->get_type()) {
@@ -196,7 +196,7 @@ bool ClassMonitor::preprocess_query(Message *m)
   }
 }
 
-bool ClassMonitor::prepare_update(Message *m)
+bool ClassMonitor::prepare_update(PaxosServiceMessage *m)
 {
   dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
   switch (m->get_type()) {
index c615ed2b09d7447e0dd0c19cc21e2e10f6d6af43..fcb6ce365271e6cfb8ba88867202fcc62750a0ed 100644 (file)
@@ -42,8 +42,8 @@ private:
 
   void committed();
 
-  bool preprocess_query(Message *m);  // true if processed.
-  bool prepare_update(Message *m);
+  bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
+  bool prepare_update(PaxosServiceMessage *m);
 
   bool preprocess_class(MClass *m);
   bool prepare_class(MClass *m);
index ad63f74eb3b41beb47669ba7fbf36c274f192899..d6c0b58b911d6d0632a0f211367ef99c38b20ee1 100644 (file)
@@ -157,7 +157,7 @@ bool ClientMonitor::check_mount(MClientMount *m)
     return false;
 }
 
-bool ClientMonitor::preprocess_query(Message *m)
+bool ClientMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
 
@@ -193,7 +193,7 @@ bool ClientMonitor::preprocess_query(Message *m)
   }
 }
 
-bool ClientMonitor::prepare_update(Message *m)
+bool ClientMonitor::prepare_update(PaxosServiceMessage *m)
 {
   stringstream ss;
   dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
index 412e68a8e92fdb9f038c85c3bc9e18317f174281..dbfa499bda79ab29e0b61ddb4706af7418272f3f 100644 (file)
@@ -52,7 +52,7 @@ public:
       if (r >= 0)
        cmon->_mounted(client, m);
       else
-       cmon->dispatch((Message*)m);
+       cmon->dispatch((PaxosServiceMessage*)m);
     }
   };
 
@@ -66,11 +66,10 @@ public:
       if (r >= 0)
        cmon->_unmounted(m);
       else
-       cmon->dispatch((Message*)m);
+       cmon->dispatch((PaxosServiceMessage*)m);
     }
   };
 
-
   ClientMap client_map;
 
 private:
@@ -88,8 +87,8 @@ private:
   void _mounted(int c, MClientMount *m);
   void _unmounted(MClientUnmount *m);
  
-  bool preprocess_query(Message *m);  // true if processed.
-  bool prepare_update(Message *m);
+  bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
+  bool prepare_update(PaxosServiceMessage *m);
 
   bool preprocess_command(MMonCommand *m);  // true if processed.
   bool prepare_command(MMonCommand *m);
index 08a23d012071152a0de34a89d077ec72d6ec7025..6aa1985e9f1be7c1b4a9e630073c1deb0deda610 100644 (file)
@@ -186,7 +186,7 @@ void LogMonitor::encode_pending(bufferlist &bl)
     p->second.encode(bl);
 }
 
-bool LogMonitor::preprocess_query(Message *m)
+bool LogMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
   switch (m->get_type()) {
@@ -203,7 +203,7 @@ bool LogMonitor::preprocess_query(Message *m)
   }
 }
 
-bool LogMonitor::prepare_update(Message *m)
+bool LogMonitor::prepare_update(PaxosServiceMessage *m)
 {
   dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
   switch (m->get_type()) {
index b772f215e03d927c2e6619b72a86fd302dc8aa77..6c14b882c100d78ff67bab109fc51e4a6b0fb284 100644 (file)
@@ -40,8 +40,8 @@ private:
 
   void committed();
 
-  bool preprocess_query(Message *m);  // true if processed.
-  bool prepare_update(Message *m);
+  bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
+  bool prepare_update(PaxosServiceMessage *m);
 
   bool preprocess_log(MLog *m);
   bool prepare_log(MLog *m);
index a6663d24dc4cb7361f92a5e1d8a0e3abffd1551a..51e81066670fa0086f02b321f16a54fe81a78fb0 100644 (file)
@@ -125,7 +125,7 @@ void MDSMonitor::encode_pending(bufferlist &bl)
 }
 
 
-bool MDSMonitor::preprocess_query(Message *m)
+bool MDSMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
 
@@ -249,7 +249,7 @@ bool MDSMonitor::preprocess_beacon(MMDSBeacon *m)
 }
 
 
-bool MDSMonitor::prepare_update(Message *m)
+bool MDSMonitor::prepare_update(PaxosServiceMessage *m)
 {
   dout(7) << "prepare_update " << *m << dendl;
 
index d8e1d65c4ea4693bbcf0c927152d34c35e641fc8..148947596512c642b2dd2cf39bcc1868e5bcc938 100644 (file)
@@ -54,7 +54,7 @@ class MDSMonitor : public PaxosService {
       if (r >= 0)
        mm->_updated(m);   // success
       else
-       mm->dispatch((Message*)m);        // try again
+       mm->dispatch((PaxosServiceMessage*)m);        // try again
     }
   };
 
@@ -67,8 +67,8 @@ class MDSMonitor : public PaxosService {
   
   void _updated(MMDSBeacon *m);
  
-  bool preprocess_query(Message *m);  // true if processed.
-  bool prepare_update(Message *m);
+  bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
+  bool prepare_update(PaxosServiceMessage *m);
   bool should_propose(double& delay);
 
   void committed();
index e6eb6cdb971ea5629e99bddfd436ccaea385f874..187f212f27c6c87670027c4ec350011a4b92936d 100644 (file)
@@ -20,9 +20,9 @@
 
 #include "MonitorStore.h"
 
-#include "msg/Message.h"
 #include "msg/Messenger.h"
 
+#include "messages/PaxosServiceMessage.h"
 #include "messages/MMonMap.h"
 #include "messages/MMonGetMap.h"
 #include "messages/MGenericMessage.h"
@@ -304,7 +304,7 @@ void Monitor::reply_command(MMonCommand *m, int rc, const string &rs)
 
 void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata)
 {
-  MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs);
+  MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, VERSION_T);
   reply->set_data(rdata);
   messenger->send_message(reply, m->get_orig_source_inst());
   delete m;
@@ -325,7 +325,7 @@ void Monitor::handle_observe(MMonObserve *m)
 void Monitor::inject_args(const entity_inst_t& inst, vector<string>& args)
 {
   dout(10) << "inject_args " << inst << " " << args << dendl;
-  MMonCommand *c = new MMonCommand(monmap->fsid);
+  MMonCommand *c = new MMonCommand(monmap->fsid, VERSION_T);
   c->cmd = args;
   messenger->send_message(c, inst);
 }
index 6beac3c1a01092a5ead568559eb11fdaefdd9cf1..5ffb331c57b231d5f8c168644aa8fd6b689d429a 100644 (file)
@@ -248,7 +248,7 @@ void OSDMonitor::committed()
 
 // -------------
 
-bool OSDMonitor::preprocess_query(Message *m)
+bool OSDMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
 
@@ -286,7 +286,7 @@ bool OSDMonitor::preprocess_query(Message *m)
   }
 }
 
-bool OSDMonitor::prepare_update(Message *m)
+bool OSDMonitor::prepare_update(PaxosServiceMessage *m)
 {
   dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
   
@@ -1317,7 +1317,7 @@ bool OSDMonitor::prepare_pool_snap ( MPoolSnap *m)
 
 void OSDMonitor::_pool_snap(MPoolSnap *m, int replyCode, epoch_t epoch)
 {
-  MPoolSnapReply *reply = new MPoolSnapReply(m->fsid, m->tid, replyCode, epoch);
+  MPoolSnapReply *reply = new MPoolSnapReply(m->fsid, m->tid, replyCode, epoch, VERSION_T);
   mon->messenger->send_message(reply, m->get_orig_source_inst());
   delete m;
 }
index d67127350536d801b1ed26021c2294c74ca56aeb..cc29e68f9782a68dac089e3998452137a00249ee 100644 (file)
@@ -57,9 +57,9 @@ private:
 
   void committed();
 
-  void handle_query(Message *m);
-  bool preprocess_query(Message *m);  // true if processed.
-  bool prepare_update(Message *m);
+  void handle_query(PaxosServiceMessage *m);
+  bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
+  bool prepare_update(PaxosServiceMessage *m);
   bool should_propose(double &delay);
 
   // ...
@@ -97,9 +97,10 @@ private:
       if (r >= 0)
        cmon->_booted(m, true);
       else
-       cmon->dispatch((Message*)m);
+       cmon->dispatch((PaxosServiceMessage*)m);
     }
   };
+
   struct C_Alive : public Context {
     OSDMonitor *osdmon;
     MOSDAlive *m;
@@ -117,7 +118,7 @@ private:
       if (r >= 0)
        cmon->_reported_failure(m);
       else
-       cmon->dispatch((Message*)m);
+       cmon->dispatch((PaxosServiceMessage*)m);
     }
   };
   struct C_Snap : public Context {
index 9471d1f9c9c6bc038ffd5d0558d83f5c7f86fa50..e439e88c2143dbe265b3fa94913bfb9d33a3b982 100644 (file)
@@ -159,7 +159,7 @@ void PGMonitor::encode_pending(bufferlist &bl)
   pending_inc.encode(bl);
 }
 
-bool PGMonitor::preprocess_query(Message *m)
+bool PGMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
   switch (m->get_type()) {
@@ -183,7 +183,7 @@ bool PGMonitor::preprocess_query(Message *m)
   }
 }
 
-bool PGMonitor::prepare_update(Message *m)
+bool PGMonitor::prepare_update(PaxosServiceMessage *m)
 {
   dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
   switch (m->get_type()) {
@@ -241,7 +241,7 @@ bool PGMonitor::preprocess_getpoolstats(MGetPoolStats *m)
     goto out;
   }
   
-  reply = new MGetPoolStatsReply(m->fsid, m->tid);
+  reply = new MGetPoolStatsReply(m->fsid, m->tid, VERSION_T);
 
   for (vector<string>::iterator p = m->pools.begin();
        p != m->pools.end();
index a9e2d79278c1b2017368bbbd2634ac8530a07146..ec7959c694ce524dcdcccb74a09f10edac999ba1 100644 (file)
@@ -50,8 +50,8 @@ private:
 
   void committed();
 
-  bool preprocess_query(Message *m);  // true if processed.
-  bool prepare_update(Message *m);
+  bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
+  bool prepare_update(PaxosServiceMessage *m);
 
   bool preprocess_pg_stats(MPGStats *stats);
   bool prepare_pg_stats(MPGStats *stats);
index f0188a8056d79666d69427d04c10deed2ff77f13..a9afa3c8c615a1fb2de5c56f5e763ec3f0b1c299 100644 (file)
@@ -762,7 +762,7 @@ void Paxos::election_starting()
 }
 
 
-void Paxos::dispatch(Message *m)
+void Paxos::dispatch(PaxosServiceMessage *m)
 {
   // election in progress?
   if (mon->is_starting()) {
index bb460e462fad0a657dedaea24a50f0f3af3bd5ff..4dfab5f892e6a3b963b182df610ef5baa3db12d3 100644 (file)
@@ -53,7 +53,7 @@ e 12v
 #include "include/types.h"
 #include "mon_types.h"
 #include "include/buffer.h"
-#include "msg/Message.h"
+#include "messages/PaxosServiceMessage.h"
 #include "msg/msg_types.h"
 
 #include "include/Context.h"
@@ -240,7 +240,7 @@ public:
     return machine_name;
   }
 
-  void dispatch(Message *m);
+  void dispatch(PaxosServiceMessage *m);
 
   void init();
 
index 5569914d1e6196e4f16f6b95ab867a8efa6ce21e..5a15fc5a6f879661820d65c9a213361955ccb787 100644 (file)
@@ -36,7 +36,7 @@ const char *PaxosService::get_machine_name()
 }
 
 
-bool PaxosService::dispatch_impl(Message *m)
+bool PaxosService::dispatch(PaxosServiceMessage *m)
 {
   dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl;
   
index 07764c37634cc910c5a1faa11291f2d63bf3a042..68178e561b4568b445bdc673f27c92b88608c0c1 100644 (file)
 #ifndef __PAXOSSERVICE_H
 #define __PAXOSSERVICE_H
 
-#include "msg/Dispatcher.h"
+#include "messages/PaxosServiceMessage.h"
 #include "include/Context.h"
 
 class Monitor;
 class Paxos;
 
-class PaxosService : public Dispatcher {
+class PaxosService {
 public:
   Monitor *mon;
   Paxos *paxos;
@@ -29,11 +29,11 @@ public:
 protected:
   class C_RetryMessage : public Context {
     PaxosService *svc;
-    Message *m;
+    PaxosServiceMessage *m;
   public:
-    C_RetryMessage(PaxosService *s, Message *m_) : svc(s), m(m_) {}
+    C_RetryMessage(PaxosService *s, PaxosServiceMessage *m_) : svc(s), m(m_) {}
     void finish(int r) {
-      svc->dispatch(m);
+      svc->dispatch(m);//FIX_ME
     }
   };
   class C_Active : public Context {
@@ -71,7 +71,6 @@ protected:
 private:
   Context *proposal_timer;
   bool have_pending;
-  bool dispatch_impl(Message *m);
 
 public:
   PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p),
@@ -92,6 +91,7 @@ private:
 public:
   // i implement and you use
   void propose_pending();     // propose current pending as new paxos state
+  bool dispatch(PaxosServiceMessage *m);
 
   // you implement
   virtual void create_initial(bufferlist& bl) = 0;
@@ -100,14 +100,14 @@ public:
   virtual void encode_pending(bufferlist& bl) = 0; // [leader] finish and encode pending for next paxos state
   virtual void discard_pending() { }       // [leader] discard pending
 
-  virtual bool preprocess_query(Message *m) = 0;  // true if processed (e.g., read-only)
-  virtual bool prepare_update(Message *m) = 0;
+  virtual bool preprocess_query(PaxosServiceMessage *m) = 0;  // true if processed (e.g., read-only)
+  virtual bool prepare_update(PaxosServiceMessage *m) = 0;
   virtual bool should_propose(double &delay);
 
   virtual void committed() = 0;            // [leader] called after a proposed value commits
 
   virtual void tick() {}
-  
+
 };
 
 #endif
index 40512fde3ca56de7de79bc56bb4e0c463996ed09..2416e895d6adec3d321e1553391d58618160564c 100644 (file)
@@ -22,6 +22,7 @@ using namespace std;
 #include "messages/MPoolSnap.h"
 #include "messages/MPoolSnapReply.h"
 
+#include "messages/PaxosServiceMessage.h"
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
 #include "messages/MMonPaxos.h"
index ade923ef06df329dc70042e3e4352a7312719ba2..36363f55d7563d9a3bf062c4699a2b3144be80f9 100644 (file)
@@ -38,6 +38,8 @@
 #define MSG_POOLSNAP               49
 #define MSG_POOLSNAPREPLY          48
 
+#define MSG_PAXOS                  40
+
 // osd internal
 #define MSG_OSD_PING         70
 #define MSG_OSD_BOOT         71
index 0d2a4264440ff9467176567a1d82e22d69511771..c792a97adc48b2a1b5ad40651f4ed00fcb5c1208 100644 (file)
@@ -408,14 +408,14 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
                  continue;
              }
              else {
-             for (vector<snapid_t>::iterator i = oi.snaps.begin(); i != oi.snaps.end(); ++i)
-               if (*i == snapid) {
-                 exists = true;
-                 break;
-               }
-             dout(10) << *iter << " has " << oi.snaps << " .. exists=" << exists << dendl;
-             if (!exists)
-               continue;
+               for (vector<snapid_t>::iterator i = oi.snaps.begin(); i != oi.snaps.end(); ++i)
+                 if (*i == snapid) {
+                   exists = true;
+                   break;
+                 }
+               dout(10) << *iter << " has " << oi.snaps << " .. exists=" << exists << dendl;
+               if (!exists)
+                 continue;
              }
            }
             response.entries.push_back(iter->oid);
index 367f1b3b8b23695f74e1bb636dfd991d4456ae88..4c49350428aaf58af35ed7fa7b87e1fe765d6515 100644 (file)
@@ -689,7 +689,7 @@ void Objecter::delete_pool_snap(int *reply, int pool, string& snapName, Context
 
 void Objecter::pool_snap_submit(SnapOp *op) {
   dout(10) << "pool_snap_submit " << op->tid << dendl;
-  MPoolSnap *m = new MPoolSnap(monmap->fsid, op->tid, op->pool, op->name, op->create);
+  MPoolSnap *m = new MPoolSnap(monmap->fsid, op->tid, op->pool, op->name, op->create, VERSION_T);
   int mon = monmap->pick_mon();
   messenger->send_message(m, monmap->get_inst(mon));
   op->last_submit = g_clock.now();
@@ -741,7 +741,7 @@ void Objecter::get_pool_stats(vector<string>& pools, map<string,pool_stat_t> *re
 void Objecter::poolstat_submit(PoolStatOp *op)
 {
   dout(10) << "poolstat_submit " << op->tid << dendl;
-  MGetPoolStats *m = new MGetPoolStats(monmap->fsid, op->tid, op->pools);
+  MGetPoolStats *m = new MGetPoolStats(monmap->fsid, op->tid, op->pools, VERSION_T);
   int mon = monmap->pick_mon();
   messenger->send_message(m, monmap->get_inst(mon));
   op->last_submit = g_clock.now();