#include "messages/MMDSGetMap.h"
#include "messages/MMDSBeacon.h"
-#include "messages/MMonCommand.h"
-#include "messages/MMonCommandAck.h"
-
#include "messages/MGenericMessage.h"
handle_mds_getmap((MMDSGetMap*)m);
return true;
- case MSG_MON_COMMAND:
- return false;
-
default:
assert(0);
delete m;
case MSG_MDS_BEACON:
return handle_beacon((MMDSBeacon*)m);
- case MSG_MON_COMMAND:
- return handle_command((MMonCommand*)m);
-
default:
assert(0);
delete m;
-bool MDSMonitor::handle_command(MMonCommand *m)
+int MDSMonitor::do_command(vector<string>& cmd, bufferlist& data,
+ bufferlist& rdata, string &rs)
{
int r = -EINVAL;
stringstream ss;
- if (m->cmd.size() > 1) {
- if (m->cmd[1] == "stop" && m->cmd.size() > 2) {
- int who = atoi(m->cmd[2].c_str());
+ if (cmd.size() > 1) {
+ if (cmd[1] == "stop" && cmd.size() > 2) {
+ int who = atoi(cmd[2].c_str());
if (mdsmap.is_active(who)) {
r = 0;
ss << "telling mds" << who << " to stop";
ss << "mds" << who << " not active (" << mdsmap.get_state_name(mdsmap.get_state(who)) << ")";
}
}
- else if (m->cmd[1] == "set_max_mds" && m->cmd.size() > 2) {
- pending_mdsmap.max_mds = atoi(m->cmd[2].c_str());
+ else if (cmd[1] == "set_max_mds" && cmd.size() > 2) {
+ pending_mdsmap.max_mds = atoi(cmd[2].c_str());
r = 0;
ss << "max_mds = " << pending_mdsmap.max_mds;
}
if (r == -EINVAL) {
ss << "unrecognized command";
}
-
+
// reply
- string rs;
- getline(ss,rs);
- mon->messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst());
- delete m;
- return r >= 0;
+ getline(ss, rs);
+ return r;
}
bool preprocess_beacon(class MMDSBeacon *m);
bool handle_beacon(class MMDSBeacon *m);
- bool handle_command(class MMonCommand *m);
void handle_mds_getmap(MMDSGetMap *m);
void take_over(entity_addr_t addr, int mds);
+ int do_command(vector<string>& cmd, bufferlist& data,
+ bufferlist& rdata, string &rs);
+
// beacons
map<entity_addr_t, utime_t> last_beacon;
pgmon = new PGMonitor(this, &paxos_pgmap);
// init paxos
- paxos_test.init();
paxos_osdmap.init();
paxos_mdsmap.init();
paxos_clientmap.init();
state = STATE_STARTING;
// tell paxos
- paxos_test.election_starting();
paxos_mdsmap.election_starting();
paxos_osdmap.election_starting();
paxos_clientmap.election_starting();
dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << dendl;
// init paxos
- paxos_test.leader_init();
paxos_mdsmap.leader_init();
paxos_osdmap.leader_init();
paxos_clientmap.leader_init();
dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << dendl;
// init paxos
- paxos_test.peon_init();
paxos_mdsmap.peon_init();
paxos_osdmap.peon_init();
paxos_clientmap.peon_init();
}
+int Monitor::do_command(vector<string>& cmd, bufferlist& data,
+ bufferlist& rdata, string &rs)
+{
+ if (cmd.empty()) {
+ rs = "no command";
+ return -EINVAL;
+ }
+
+ if (cmd[0] == "stop") {
+ rs = "stopping";
+ do_stop();
+ return 0;
+ }
+ if (cmd[0] == "mds")
+ return mdsmon->do_command(cmd, data, rdata, rs);
+
+ if (cmd[0] == "osd")
+ return osdmon->do_command(cmd, data, rdata, rs);
+
+ // huh.
+ rs = "unrecognized subsystem '" + cmd[0] + "'";
+ return -EINVAL;
+}
+
void Monitor::handle_command(MMonCommand *m)
{
dout(0) << "handle_command " << *m << dendl;
- int r = -1;
- string rs = "unrecognized command";
-
- if (!m->cmd.empty()) {
- if (m->cmd[0] == "stop") {
- r = 0;
- rs = "stopping";
- do_stop();
- }
- else if (m->cmd[0] == "mds") {
- mdsmon->dispatch(m);
- return;
- }
- else if (m->cmd[0] == "osd") {
-
- }
- }
-
- // reply
- messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst());
+ string rs; // return string
+ bufferlist rdata; // return data
+ int rc = do_command(m->cmd, m->get_data(), rdata, rs);
+
+ MMonCommandAck *reply = new MMonCommandAck(rc, rs);
+ reply->set_data(rdata);
+ messenger->send_message(reply, m->get_source_inst());
delete m;
}
void Monitor::do_stop()
{
- dout(0) << "do_stop -- shutting down" << dendl;
+ dout(0) << "do_stop -- initiating shutdown" << dendl;
stopping = true;
mdsmon->do_stop();
}
// send it to the right paxos instance
switch (pm->machine_id) {
- case PAXOS_TEST:
- paxos_test.dispatch(m);
- break;
case PAXOS_OSDMAP:
paxos_osdmap.dispatch(m);
break;
// -- paxos --
- Paxos paxos_test;
Paxos paxos_mdsmap;
Paxos paxos_osdmap;
Paxos paxos_clientmap;
void handle_ping_ack(class MPingAck *m);
void handle_command(class MMonCommand *m);
-
+ int do_command(vector<string>& cmd, bufferlist& data,
+ bufferlist& rdata, string &rs);
public:
Monitor(int w, Messenger *m, MonMap *mm) :
mon_epoch(0),
leader(0),
- paxos_test(this, w, PAXOS_TEST),
paxos_mdsmap(this, w, PAXOS_MDSMAP),
paxos_osdmap(this, w, PAXOS_OSDMAP),
paxos_clientmap(this, w, PAXOS_CLIENTMAP),
}
+int OSDMonitor::do_command(vector<string>& cmd, bufferlist& data,
+ bufferlist& rdata, string &rs)
+{
+ rs = "unknown command";
+ return -EINVAL;
+}
void tick(); // check state, take actions
+ int do_command(vector<string>& cmd, bufferlist& data,
+ bufferlist& rdata, string &rs);
+
void mark_all_down();
void send_latest(entity_inst_t i, epoch_t start=0);
* "read" their copy of the last committed value.
*
* This provides a simple replication substrate that services can be built on top of.
+ * See PaxosService.h
*/
#ifndef __MON_PAXOS_H
void propose_pending(); // propose current pending as new paxos state
// you implement
- virtual bool update_from_paxos() = 0; // assimilate latest paxos state
+ virtual bool update_from_paxos() = 0; // assimilate latest state from paxos
virtual void create_pending() = 0; // [leader] create new pending structures
virtual void create_initial() = 0; // [leader] populate pending with initial state (1)
virtual void encode_pending(bufferlist& bl) = 0; // [leader] finish and encode pending for next paxos state
virtual bool prepare_update(Message *m) = 0;
virtual bool should_propose(double &delay);
- virtual void committed() = 0;
+ virtual void committed() = 0; // [leader] called after a proposed value commits
};