#include "messages/MMDSMap.h"
#include "messages/MMDSGetMap.h"
#include "messages/MMDSBeacon.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
#include "messages/MGenericMessage.h"
handle_mds_getmap((MMDSGetMap*)m);
return true;
+ case MSG_MON_COMMAND:
+ return preprocess_command((MMonCommand*)m);
+
default:
assert(0);
delete m;
}
+bool MDSMonitor::preprocess_command(MMonCommand *m)
+{
+ int r = -1;
+ bufferlist rdata;
+ stringstream ss;
-int MDSMonitor::do_command(vector<string>& cmd, bufferlist& data,
- bufferlist& rdata, string &rs)
+ if (m->cmd.size() > 1) {
+ if (m->cmd[1] == "getmap") {
+ mdsmap.encode(rdata);
+ ss << "got mdsmap epoch " << mdsmap.get_epoch();
+ r = 0;
+ }
+ }
+
+ if (r != -1) {
+ string rs;
+ ss >> rs;
+ MMonCommandAck *reply = new MMonCommandAck(r, rs);
+ reply->set_data(rdata);
+ mon->messenger->send_message(reply, m->inst);
+ delete m;
+ return true;
+ } else
+ return false;
+}
+
+bool MDSMonitor::prepare_command(MMonCommand *m)
{
int r = -EINVAL;
stringstream ss;
+ bufferlist rdata;
- if (cmd.size() > 1) {
- if (cmd[1] == "stop" && cmd.size() > 2) {
- int who = atoi(cmd[2].c_str());
+ if (m->cmd.size() > 1) {
+ if (m->cmd[1] == "stop" && m->cmd.size() > 2) {
+ int who = atoi(m->cmd[2].c_str());
if (mdsmap.is_active(who)) {
r = 0;
ss << "telling mds" << who << " to stop";
ss << "mds" << who << " not active (" << mdsmap.get_state_name(mdsmap.get_state(who)) << ")";
}
}
- else if (cmd[1] == "set_max_mds" && cmd.size() > 2) {
- pending_mdsmap.max_mds = atoi(cmd[2].c_str());
+ else if (m->cmd[1] == "set_max_mds" && m->cmd.size() > 2) {
+ pending_mdsmap.max_mds = atoi(m->cmd[2].c_str());
r = 0;
ss << "max_mds = " << pending_mdsmap.max_mds;
}
}
- if (r == -EINVAL) {
+ if (r == -EINVAL)
ss << "unrecognized command";
- }
-
- // reply
+ string rs;
getline(ss, rs);
- return r;
+
+ if (r >= 0) {
+ // success.. delay reply
+ paxos->wait_for_commit(new Monitor::C_Command(mon, m, r, rs));
+ return true;
+ } else {
+ // reply immediately
+ MMonCommandAck *reply = new MMonCommandAck(r, rs);
+ reply->set_data(rdata);
+ mon->messenger->send_message(reply, m->inst);
+ delete m;
+ return false;
+ }
}
class MMDSBeacon;
class MMDSGetMap;
+class MMonCommand;
class MDSMonitor : public PaxosService {
public:
void take_over(entity_addr_t addr, int mds);
- int do_command(vector<string>& cmd, bufferlist& data,
- bufferlist& rdata, string &rs);
+ bool preprocess_command(MMonCommand *m);
+ bool prepare_command(MMonCommand *m);
// beacons
map<entity_addr_t, utime_t> last_beacon;
pgmon->election_finished();
}
-
-int Monitor::do_command(vector<string>& cmd, bufferlist& data,
- bufferlist& rdata, string &rs)
+void Monitor::handle_command(MMonCommand *m)
{
- if (cmd.empty()) {
+ dout(0) << "handle_command " << *m << dendl;
+ string rs;
+ if (!m->cmd.empty()) {
+ if (m->cmd[0] == "mds") {
+ mdsmon->dispatch(m);
+ return;
+ }
+ if (m->cmd[0] == "osd") {
+ osdmon->dispatch(m);
+ return;
+ }
+ if (m->cmd[0] == "stop") {
+ do_stop();
+ return;
+ }
+ rs = "unrecognized subsystem";
+ } else
rs = "no command";
- return -EINVAL;
- }
-
- if (cmd[0] == "stop") {
- rs = "stopping";
- do_stop();
- return 0;
- }
- if (cmd[0] == "mds")
- return mdsmon->do_command(cmd, data, rdata, rs);
-
- if (cmd[0] == "osd")
- return osdmon->do_command(cmd, data, rdata, rs);
-
- // huh.
- rs = "unrecognized subsystem '" + cmd[0] + "'";
- return -EINVAL;
+ MMonCommandAck *reply = new MMonCommandAck(-EINVAL, rs);
+ messenger->send_message(reply, m->inst);
+ delete m;
}
-void Monitor::handle_command(MMonCommand *m)
+void Monitor::finish_command(MMonCommand *m, int rc, const string &rs)
{
- dout(0) << "handle_command " << *m << dendl;
-
- string rs; // return string
- bufferlist rdata; // return data
- int rc = do_command(m->cmd, m->get_data(), rdata, rs);
- if (rc == -EROFS && !is_leader()) {
- dout(10) << "forwarding command to leader" << dendl;
- messenger->send_message(m, monmap->get_inst(get_leader()));
- return;
- }
-
MMonCommandAck *reply = new MMonCommandAck(rc, rs);
- reply->set_data(rdata);
messenger->send_message(reply, m->inst);
delete m;
}
-
void Monitor::do_stop()
{
dout(0) << "do_stop -- initiating shutdown" << dendl;
void handle_ping_ack(class MPingAck *m);
void handle_command(class MMonCommand *m);
- int do_command(vector<string>& cmd, bufferlist& data,
- bufferlist& rdata, string &rs);
+ void finish_command(MMonCommand *m, int rc, const string &rs);
+public:
+ struct C_Command : public Context {
+ Monitor *mon;
+ MMonCommand *m;
+ int rc;
+ string rs;
+ C_Command(Monitor *_mm, MMonCommand *_m, int r, string& s) :
+ mon(_mm), m(_m), rc(r), rs(s) {}
+ void finish(int r) {
+ mon->finish_command(m, rc, rs);
+ }
+ };
public:
Monitor(int w, Messenger *m, MonMap *mm) :
#include "messages/MOSDBoot.h"
#include "messages/MOSDIn.h"
#include "messages/MOSDOut.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
#include "common/Timer.h"
#include "config.h"
+#include <sstream>
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) *_dout << dbeginl << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd e" << osdmap.get_epoch() << " "
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) *_derr << dbeginl << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd e" << osdmap.get_epoch() << " "
handle_osd_getmap((MOSDGetMap*)m);
return true;
+ case MSG_MON_COMMAND:
+ return preprocess_command((MMonCommand*)m);
+
// damp updates
case MSG_OSD_FAILURE:
return preprocess_failure((MOSDFailure*)m);
case MSG_OSD_BOOT:
return prepare_boot((MOSDBoot*)m);
+ case MSG_MON_COMMAND:
+ return prepare_command((MMonCommand*)m);
+
/*
case MSG_OSD_IN:
return prepare_in((MOSDIn*)m);
bool OSDMonitor::should_propose(double& delay)
{
+ dout(10) << "should_propose" << dendl;
if (osdmap.epoch == 1) {
if (pending_inc.new_up.size() == (unsigned)g_conf.num_osd) {
delay = 0.0;
}
+
+bool OSDMonitor::preprocess_command(MMonCommand *m)
+{
+ int r = -1;
+ bufferlist rdata;
+ stringstream ss;
+
+ if (m->cmd.size() > 1) {
+ if (m->cmd[1] == "getmap") {
+ osdmap.encode(rdata);
+ ss << "got osdmap epoch " << osdmap.get_epoch();
+ r = 0;
+ }
+ else if (m->cmd[1] == "getcrushmap") {
+ osdmap.crush._encode(rdata);
+ ss << "got crush map from osdmap epoch " << osdmap.get_epoch();
+ r = 0;
+ }
+ }
+ if (r != -1) {
+ string rs;
+ ss >> rs;
+ MMonCommandAck *reply = new MMonCommandAck(r, rs);
+ reply->set_data(rdata);
+ mon->messenger->send_message(reply, m->inst);
+ delete m;
+ return true;
+ } else
+ return false;
+}
+
+bool OSDMonitor::prepare_command(MMonCommand *m)
+{
+ if (m->cmd.size() > 1) {
+ if (m->cmd[1] == "setcrushmap") {
+ dout(10) << "prepare_command setting new crush map" << dendl;
+ pending_inc.crush = m->get_data();
+ string rs = "set crush map";
+ paxos->wait_for_commit(new Monitor::C_Command(mon, m, 0, rs));
+ return true;
+ }
+ }
+ return false;
+}
+/*
int OSDMonitor::do_command(vector<string>& cmd, bufferlist& data,
bufferlist& rdata, string &rs)
{
return 0;
}
if (cmd[1] == "setcrushmap") {
- if (!mon->is_leader()) return -EROFS;
- // HACK
- pending_inc.crush = data;
- propose_pending();
+ return -EAGAIN;
}
rs = "unknown command";
return -EINVAL;
}
+*/
class Monitor;
class MOSDBoot;
+class MMonCommand;
class OSDMonitor : public PaxosService {
public:
void tick(); // check state, take actions
- int do_command(vector<string>& cmd, bufferlist& data,
- bufferlist& rdata, string &rs);
+ bool preprocess_command(MMonCommand *m);
+ bool prepare_command(MMonCommand *m);
+ void finish_command(MMonCommand *m, int rc, const string &rs);
void mark_all_down();