messages/MRemoveSnaps.h\
messages/MStatfs.h\
messages/MStatfsReply.h\
+ messages/PaxosServiceMessage.h\
mon/ClassMonitor.h\
mon/ClientMap.h\
mon/ClientMonitor.h\
CLASS_RESPONSE,
};
-class MClass : public Message {
+class MClass : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
deque<ClassInfo> info;
__s32 action;
- MClass() : Message(MSG_CLASS) {}
+ MClass() : PaxosServiceMessage(MSG_CLASS, 0) {}
#if 0
- MClass(ceph_fsid_t& f, deque<ClassLibraryIncremental>& e) : Message(MSG_CLASS), fsid(f), entries(e), last(0), action(0) { }
+ MClass(ceph_fsid_t& f, deque<ClassLibraryIncremental>& e) :
+ PaxosServiceMessage(MSG_CLASS, 0),
+ fsid(f), entries(e), last(0), action(0) {}
#endif
- MClass(ceph_fsid_t& f, version_t l) : Message(MSG_CLASS), fsid(f), last(l) {}
+ MClass(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_CLASS, l), fsid(f), last(l) {}
const char *get_type_name() { return "class"; }
void print(ostream& out) {
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(info, payload);
::encode(impl, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(info, p);
::decode(impl, p);
#include "include/ClassLibrary.h"
-class MClassAck : public Message {
+class MClassAck : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
version_t last;
- MClassAck() : Message(MSG_CLASS_ACK) {}
- MClassAck(ceph_fsid_t& f, version_t l) : Message(MSG_CLASS_ACK), fsid(f), last(l) {}
+ MClassAck() : PaxosServiceMessage(MSG_CLASS_ACK, 0) {}
+ MClassAck(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_CLASS_ACK, l), fsid(f), last(l) {}
const char *get_type_name() { return "class_ack"; }
void print(ostream& out) {
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(last, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(last, p);
}
#ifndef __MGETPOOLSTATS_H
#define __MGETPOOLSTATS_H
-class MGetPoolStats : public Message {
+#include "messages/PaxosServiceMessage.h"
+
+class MGetPoolStats : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
tid_t tid;
vector<string> pools;
- MGetPoolStats() : Message(MSG_GETPOOLSTATS) {}
- MGetPoolStats(ceph_fsid_t& f, tid_t t, vector<string>& ls) :
- Message(MSG_GETPOOLSTATS),
+ MGetPoolStats() : PaxosServiceMessage(MSG_GETPOOLSTATS, 0) {}
+ MGetPoolStats(ceph_fsid_t& f, tid_t t, vector<string>& ls, version_t l) :
+ PaxosServiceMessage(MSG_GETPOOLSTATS, l),
fsid(f), tid(t), pools(ls) { }
const char *get_type_name() { return "getpoolstats"; }
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(tid, payload);
::encode(pools, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(tid, p);
::decode(pools, p);
#ifndef __MGETPOOLSTATSREPLY_H
#define __MGETPOOLSTATSREPLY_H
-class MGetPoolStatsReply : public Message {
+class MGetPoolStatsReply : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
tid_t tid;
map<string,pool_stat_t> pool_stats;
- MGetPoolStatsReply() : Message(MSG_GETPOOLSTATSREPLY) {}
- MGetPoolStatsReply(ceph_fsid_t& f, tid_t t) :
- Message(MSG_GETPOOLSTATSREPLY),
+ MGetPoolStatsReply() : PaxosServiceMessage(MSG_GETPOOLSTATSREPLY, 0) {}
+ MGetPoolStatsReply(ceph_fsid_t& f, tid_t t, version_t v) :
+ PaxosServiceMessage(MSG_GETPOOLSTATSREPLY, v),
fsid(f), tid(t) { }
const char *get_type_name() { return "getpoolstats"; }
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(tid, payload);
::encode(pool_stats, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(tid, p);
::decode(pool_stats, p);
#include "include/LogEntry.h"
-class MLog : public Message {
+class MLog : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
deque<LogEntry> entries;
version_t last;
- MLog() : Message(MSG_LOG) {}
- MLog(ceph_fsid_t& f, deque<LogEntry>& e) : Message(MSG_LOG), fsid(f), entries(e), last(0) { }
- MLog(ceph_fsid_t& f, version_t l) : Message(MSG_LOG), fsid(f), last(l) {}
+ MLog() : PaxosServiceMessage(MSG_LOG, 0) {}
+ MLog(ceph_fsid_t& f, deque<LogEntry>& e) : PaxosServiceMessage(MSG_LOG, 0), fsid(f), entries(e), last(0) { }
+ MLog(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_LOG, l), fsid(f), last(l) {}
const char *get_type_name() { return "log"; }
void print(ostream& out) {
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(entries, payload);
::encode(last, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(entries, p);
::decode(last, p);
#define __MLOGACK_H
#include "include/LogEntry.h"
+#include "messages/PaxosServiceMessage.h"
-class MLogAck : public Message {
+class MLogAck : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
version_t last;
- MLogAck() : Message(MSG_LOGACK) {}
- MLogAck(ceph_fsid_t& f, version_t l) : Message(MSG_LOGACK), fsid(f), last(l) {}
+ MLogAck() : PaxosServiceMessage(MSG_LOGACK, 0) {}
+ MLogAck(ceph_fsid_t& f, version_t l) : PaxosServiceMessage(MSG_LOGACK, l), fsid(f), last(l) {}
const char *get_type_name() { return "log_ack"; }
void print(ostream& out) {
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(last, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(last, p);
}
#ifndef __MMDSBEACON_H
#define __MMDSBEACON_H
-#include "msg/Message.h"
+#include "messages/PaxosServiceMessage.h"
#include "include/types.h"
#include "mds/MDSMap.h"
-class MMDSBeacon : public Message {
+class MMDSBeacon : public PaxosServiceMessage {
ceph_fsid_t fsid;
string name;
epoch_t last_epoch_seen; // include last mdsmap epoch mds has seen to avoid race with monitor decree
string standby_for_name;
public:
- MMDSBeacon() : Message(MSG_MDS_BEACON) {}
+ MMDSBeacon() : PaxosServiceMessage(MSG_MDS_BEACON, 0) {}
MMDSBeacon(ceph_fsid_t &f, string& n, epoch_t les, int st, version_t se) :
- Message(MSG_MDS_BEACON),
+ PaxosServiceMessage(MSG_MDS_BEACON, se),
fsid(f), name(n), last_epoch_seen(les), state(st), seq(se),
standby_for_rank(-1) { }
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(last_epoch_seen, payload);
::encode(state, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(last_epoch_seen, p);
::decode(state, p);
#ifndef __MMONCOMMAND_H
#define __MMONCOMMAND_H
-#include "msg/Message.h"
+#include "messages/PaxosServiceMessage.h"
#include <vector>
using std::vector;
-class MMonCommand : public Message {
+class MMonCommand : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
vector<string> cmd;
- MMonCommand() : Message(MSG_MON_COMMAND) {}
- MMonCommand(ceph_fsid_t &f) :
- Message(MSG_MON_COMMAND),
+ MMonCommand() : PaxosServiceMessage(MSG_MON_COMMAND, 0) {}
+ MMonCommand(ceph_fsid_t &f, version_t version) :
+ PaxosServiceMessage(MSG_MON_COMMAND, version),
fsid(f) { }
const char *get_type_name() { return "mon_command"; }
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(cmd, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(cmd, p);
}
#include "msg/Message.h"
-class MMonCommandAck : public Message {
+class MMonCommandAck : public PaxosServiceMessage {
public:
vector<string> cmd;
__s32 r;
string rs;
- MMonCommandAck() : Message(MSG_MON_COMMAND_ACK) {}
- MMonCommandAck(vector<string>& c, int _r, string s) :
- Message(MSG_MON_COMMAND_ACK),
+ MMonCommandAck() : PaxosServiceMessage(MSG_MON_COMMAND_ACK, 0) {}
+ MMonCommandAck(vector<string>& c, int _r, string s, version_t v) :
+ PaxosServiceMessage(MSG_MON_COMMAND_ACK, v),
cmd(c), r(_r), rs(s) { }
const char *get_type_name() { return "mon_command"; }
}
void encode_payload() {
+ paxos_encode();
::encode(r, payload);
::encode(rs, payload);
::encode(cmd, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(r, p);
::decode(rs, p);
::decode(cmd, p);
#include <vector>
using std::vector;
-class MMonObserve : public Message {
+class MMonObserve : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
uint32_t machine_id;
version_t ver;
- MMonObserve() : Message(MSG_MON_OBSERVE) {}
+ MMonObserve() : PaxosServiceMessage(MSG_MON_OBSERVE, 0) {}
MMonObserve(ceph_fsid_t &f, int mid, version_t v) :
- Message(MSG_MON_OBSERVE),
+ PaxosServiceMessage(MSG_MON_OBSERVE, v),
fsid(f), machine_id(mid), ver(v) { }
const char *get_type_name() { return "mon_observe"; }
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(machine_id, payload);
::encode(ver, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(machine_id, p);
::decode(ver, p);
#include "msg/Message.h"
-class MMonObserveNotify : public Message {
+class MMonObserveNotify : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
int32_t machine_id;
version_t ver;
bool is_latest;
- MMonObserveNotify() : Message(MSG_MON_OBSERVE_NOTIFY) {}
+ MMonObserveNotify() : PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, 0) {}
MMonObserveNotify(ceph_fsid_t& f, int id, bufferlist& b, version_t v, bool l) :
- Message(MSG_MON_OBSERVE_NOTIFY), fsid(f), machine_id(id), bl(b), ver(v), is_latest(l) {}
+ PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, v), fsid(f), machine_id(id), bl(b), ver(v), is_latest(l) {}
const char *get_type_name() { return "mon_observe_notify"; }
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(machine_id, payload);
::encode(bl, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(machine_id, p);
::decode(bl, p);
#include "include/types.h"
#include "osd/osd_types.h"
-class MOSDBoot : public Message {
+class MOSDBoot : public PaxosServiceMessage {
public:
OSDSuperblock sb;
- MOSDBoot() {}
- MOSDBoot(OSDSuperblock& s) :
- Message(MSG_OSD_BOOT),
- sb(s) {
+ MOSDBoot() : PaxosServiceMessage(){}
+ MOSDBoot(OSDSuperblock& s, version_t v) :
+ PaxosServiceMessage(MSG_OSD_BOOT, v), sb(s) {
}
const char *get_type_name() { return "osd_boot"; }
#ifndef __MPOOLSNAP_H
#define __MPOOLSNAP_H
+#include "messages/PaxosServiceMessage.h"
-class MPoolSnap : public Message {
+
+class MPoolSnap : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
tid_t tid;
string name;
bool create;
- MPoolSnap() : Message(MSG_POOLSNAP) {}
- MPoolSnap( ceph_fsid_t& f, tid_t t, int p, string& n, bool c) :
- Message(MSG_POOLSNAP), fsid(f), tid(t), pool(p), name(n), create(c) {}
+ MPoolSnap() : PaxosServiceMessage(MSG_POOLSNAP, 0) {}
+ MPoolSnap( ceph_fsid_t& f, tid_t t, int p, string& n, bool c, version_t v) :
+ PaxosServiceMessage(MSG_POOLSNAP, v), fsid(f), tid(t), pool(p), name(n), create(c) {}
const char *get_type_name() { return "poolsnap"; }
void print(ostream& out) {
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(tid, payload);
::encode(pool, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(tid, p);
::decode(pool, p);
#define __MPOOLSNAPREPLY_H
-class MPoolSnapReply : public Message {
+class MPoolSnapReply : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
tid_t tid;
epoch_t epoch;
- MPoolSnapReply() : Message(MSG_POOLSNAPREPLY) {}
- MPoolSnapReply( ceph_fsid_t& f, tid_t t, int rc, int e) :
- Message(MSG_POOLSNAPREPLY), fsid(f), tid(t), replyCode(rc), epoch(e) {}
+ MPoolSnapReply() : PaxosServiceMessage(MSG_POOLSNAPREPLY, 0) {}
+ MPoolSnapReply( ceph_fsid_t& f, tid_t t, int rc, int e, version_t v) :
+ PaxosServiceMessage(MSG_POOLSNAPREPLY, v), fsid(f), tid(t), replyCode(rc), epoch(e) {}
const char *get_type_name() { return "poolsnapreply"; }
}
void encode_payload() {
+ paxos_encode();
::encode(fsid, payload);
::encode(tid, payload);
::encode(replyCode, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
::decode(fsid, p);
::decode(tid, p);
::decode(replyCode, p);
p->second.encode(bl);
}
-bool ClassMonitor::preprocess_query(Message *m)
+bool ClassMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
}
}
-bool ClassMonitor::prepare_update(Message *m)
+bool ClassMonitor::prepare_update(PaxosServiceMessage *m)
{
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
void committed();
- bool preprocess_query(Message *m); // true if processed.
- bool prepare_update(Message *m);
+ bool preprocess_query(PaxosServiceMessage *m); // true if processed.
+ bool prepare_update(PaxosServiceMessage *m);
bool preprocess_class(MClass *m);
bool prepare_class(MClass *m);
return false;
}
-bool ClientMonitor::preprocess_query(Message *m)
+bool ClientMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
}
}
-bool ClientMonitor::prepare_update(Message *m)
+bool ClientMonitor::prepare_update(PaxosServiceMessage *m)
{
stringstream ss;
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
if (r >= 0)
cmon->_mounted(client, m);
else
- cmon->dispatch((Message*)m);
+ cmon->dispatch((PaxosServiceMessage*)m);
}
};
if (r >= 0)
cmon->_unmounted(m);
else
- cmon->dispatch((Message*)m);
+ cmon->dispatch((PaxosServiceMessage*)m);
}
};
-
ClientMap client_map;
private:
void _mounted(int c, MClientMount *m);
void _unmounted(MClientUnmount *m);
- bool preprocess_query(Message *m); // true if processed.
- bool prepare_update(Message *m);
+ bool preprocess_query(PaxosServiceMessage *m); // true if processed.
+ bool prepare_update(PaxosServiceMessage *m);
bool preprocess_command(MMonCommand *m); // true if processed.
bool prepare_command(MMonCommand *m);
p->second.encode(bl);
}
-bool LogMonitor::preprocess_query(Message *m)
+bool LogMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
}
}
-bool LogMonitor::prepare_update(Message *m)
+bool LogMonitor::prepare_update(PaxosServiceMessage *m)
{
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
void committed();
- bool preprocess_query(Message *m); // true if processed.
- bool prepare_update(Message *m);
+ bool preprocess_query(PaxosServiceMessage *m); // true if processed.
+ bool prepare_update(PaxosServiceMessage *m);
bool preprocess_log(MLog *m);
bool prepare_log(MLog *m);
}
-bool MDSMonitor::preprocess_query(Message *m)
+bool MDSMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
}
-bool MDSMonitor::prepare_update(Message *m)
+bool MDSMonitor::prepare_update(PaxosServiceMessage *m)
{
dout(7) << "prepare_update " << *m << dendl;
if (r >= 0)
mm->_updated(m); // success
else
- mm->dispatch((Message*)m); // try again
+ mm->dispatch((PaxosServiceMessage*)m); // try again
}
};
void _updated(MMDSBeacon *m);
- bool preprocess_query(Message *m); // true if processed.
- bool prepare_update(Message *m);
+ bool preprocess_query(PaxosServiceMessage *m); // true if processed.
+ bool prepare_update(PaxosServiceMessage *m);
bool should_propose(double& delay);
void committed();
#include "MonitorStore.h"
-#include "msg/Message.h"
#include "msg/Messenger.h"
+#include "messages/PaxosServiceMessage.h"
#include "messages/MMonMap.h"
#include "messages/MMonGetMap.h"
#include "messages/MGenericMessage.h"
void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata)
{
- MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs);
+ MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, VERSION_T);
reply->set_data(rdata);
messenger->send_message(reply, m->get_orig_source_inst());
delete m;
void Monitor::inject_args(const entity_inst_t& inst, vector<string>& args)
{
dout(10) << "inject_args " << inst << " " << args << dendl;
- MMonCommand *c = new MMonCommand(monmap->fsid);
+ MMonCommand *c = new MMonCommand(monmap->fsid, VERSION_T);
c->cmd = args;
messenger->send_message(c, inst);
}
// -------------
-bool OSDMonitor::preprocess_query(Message *m)
+bool OSDMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
}
}
-bool OSDMonitor::prepare_update(Message *m)
+bool OSDMonitor::prepare_update(PaxosServiceMessage *m)
{
dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
void OSDMonitor::_pool_snap(MPoolSnap *m, int replyCode, epoch_t epoch)
{
- MPoolSnapReply *reply = new MPoolSnapReply(m->fsid, m->tid, replyCode, epoch);
+ MPoolSnapReply *reply = new MPoolSnapReply(m->fsid, m->tid, replyCode, epoch, VERSION_T);
mon->messenger->send_message(reply, m->get_orig_source_inst());
delete m;
}
void committed();
- void handle_query(Message *m);
- bool preprocess_query(Message *m); // true if processed.
- bool prepare_update(Message *m);
+ void handle_query(PaxosServiceMessage *m);
+ bool preprocess_query(PaxosServiceMessage *m); // true if processed.
+ bool prepare_update(PaxosServiceMessage *m);
bool should_propose(double &delay);
// ...
if (r >= 0)
cmon->_booted(m, true);
else
- cmon->dispatch((Message*)m);
+ cmon->dispatch((PaxosServiceMessage*)m);
}
};
+
struct C_Alive : public Context {
OSDMonitor *osdmon;
MOSDAlive *m;
if (r >= 0)
cmon->_reported_failure(m);
else
- cmon->dispatch((Message*)m);
+ cmon->dispatch((PaxosServiceMessage*)m);
}
};
struct C_Snap : public Context {
pending_inc.encode(bl);
}
-bool PGMonitor::preprocess_query(Message *m)
+bool PGMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
}
}
-bool PGMonitor::prepare_update(Message *m)
+bool PGMonitor::prepare_update(PaxosServiceMessage *m)
{
dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
goto out;
}
- reply = new MGetPoolStatsReply(m->fsid, m->tid);
+ reply = new MGetPoolStatsReply(m->fsid, m->tid, VERSION_T);
for (vector<string>::iterator p = m->pools.begin();
p != m->pools.end();
void committed();
- bool preprocess_query(Message *m); // true if processed.
- bool prepare_update(Message *m);
+ bool preprocess_query(PaxosServiceMessage *m); // true if processed.
+ bool prepare_update(PaxosServiceMessage *m);
bool preprocess_pg_stats(MPGStats *stats);
bool prepare_pg_stats(MPGStats *stats);
}
-void Paxos::dispatch(Message *m)
+void Paxos::dispatch(PaxosServiceMessage *m)
{
// election in progress?
if (mon->is_starting()) {
#include "include/types.h"
#include "mon_types.h"
#include "include/buffer.h"
-#include "msg/Message.h"
+#include "messages/PaxosServiceMessage.h"
#include "msg/msg_types.h"
#include "include/Context.h"
return machine_name;
}
- void dispatch(Message *m);
+ void dispatch(PaxosServiceMessage *m);
void init();
}
-bool PaxosService::dispatch_impl(Message *m)
+bool PaxosService::dispatch(PaxosServiceMessage *m)
{
dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl;
#ifndef __PAXOSSERVICE_H
#define __PAXOSSERVICE_H
-#include "msg/Dispatcher.h"
+#include "messages/PaxosServiceMessage.h"
#include "include/Context.h"
class Monitor;
class Paxos;
-class PaxosService : public Dispatcher {
+class PaxosService {
public:
Monitor *mon;
Paxos *paxos;
protected:
class C_RetryMessage : public Context {
PaxosService *svc;
- Message *m;
+ PaxosServiceMessage *m;
public:
- C_RetryMessage(PaxosService *s, Message *m_) : svc(s), m(m_) {}
+ C_RetryMessage(PaxosService *s, PaxosServiceMessage *m_) : svc(s), m(m_) {}
void finish(int r) {
- svc->dispatch(m);
+ svc->dispatch(m);//FIX_ME
}
};
class C_Active : public Context {
private:
Context *proposal_timer;
bool have_pending;
- bool dispatch_impl(Message *m);
public:
PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p),
public:
// i implement and you use
void propose_pending(); // propose current pending as new paxos state
+ bool dispatch(PaxosServiceMessage *m);
// you implement
virtual void create_initial(bufferlist& bl) = 0;
virtual void encode_pending(bufferlist& bl) = 0; // [leader] finish and encode pending for next paxos state
virtual void discard_pending() { } // [leader] discard pending
- virtual bool preprocess_query(Message *m) = 0; // true if processed (e.g., read-only)
- virtual bool prepare_update(Message *m) = 0;
+ virtual bool preprocess_query(PaxosServiceMessage *m) = 0; // true if processed (e.g., read-only)
+ virtual bool prepare_update(PaxosServiceMessage *m) = 0;
virtual bool should_propose(double &delay);
virtual void committed() = 0; // [leader] called after a proposed value commits
virtual void tick() {}
-
+
};
#endif
#include "messages/MPoolSnap.h"
#include "messages/MPoolSnapReply.h"
+#include "messages/PaxosServiceMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
#define MSG_POOLSNAP 49
#define MSG_POOLSNAPREPLY 48
+#define MSG_PAXOS 40
+
// osd internal
#define MSG_OSD_PING 70
#define MSG_OSD_BOOT 71
continue;
}
else {
- for (vector<snapid_t>::iterator i = oi.snaps.begin(); i != oi.snaps.end(); ++i)
- if (*i == snapid) {
- exists = true;
- break;
- }
- dout(10) << *iter << " has " << oi.snaps << " .. exists=" << exists << dendl;
- if (!exists)
- continue;
+ for (vector<snapid_t>::iterator i = oi.snaps.begin(); i != oi.snaps.end(); ++i)
+ if (*i == snapid) {
+ exists = true;
+ break;
+ }
+ dout(10) << *iter << " has " << oi.snaps << " .. exists=" << exists << dendl;
+ if (!exists)
+ continue;
}
}
response.entries.push_back(iter->oid);
void Objecter::pool_snap_submit(SnapOp *op) {
dout(10) << "pool_snap_submit " << op->tid << dendl;
- MPoolSnap *m = new MPoolSnap(monmap->fsid, op->tid, op->pool, op->name, op->create);
+ MPoolSnap *m = new MPoolSnap(monmap->fsid, op->tid, op->pool, op->name, op->create, VERSION_T);
int mon = monmap->pick_mon();
messenger->send_message(m, monmap->get_inst(mon));
op->last_submit = g_clock.now();
void Objecter::poolstat_submit(PoolStatOp *op)
{
dout(10) << "poolstat_submit " << op->tid << dendl;
- MGetPoolStats *m = new MGetPoolStats(monmap->fsid, op->tid, op->pools);
+ MGetPoolStats *m = new MGetPoolStats(monmap->fsid, op->tid, op->pools, VERSION_T);
int mon = monmap->pick_mon();
messenger->send_message(m, monmap->get_inst(mon));
op->last_submit = g_clock.now();