#include "global/signal_handler.h"
#include "include/types.h"
+#include "include/str_list.h"
#include "common/entity_name.h"
#include "common/Clock.h"
#include "common/signal.h"
#include "messages/MMDSTableRequest.h"
#include "messages/MMonCommand.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
#include "auth/AuthAuthorizeHandler.h"
#include "auth/KeyRing.h"
return;
}
+/* This function DOES put the passed message before returning*/
+void MDS::handle_command(MCommand *m)
+{
+ // FIXME authorize based on m->get_connection()
+
+ int r = 0;
+ cmdmap_t cmdmap;
+ std::stringstream ss;
+ std::string outs;
+ bufferlist outbl;
+ Context *run_after = NULL;
+
+
+ if (m->cmd.empty()) {
+ ss << "no command given";
+ outs = ss.str();
+ } else if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
+ r = -EINVAL;
+ outs = ss.str();
+ } else {
+ r = _handle_command(cmdmap, m->get_data(), &outbl, &outs, &run_after);
+ }
+
+ if (m->get_connection()) {
+ MCommandReply *reply = new MCommandReply(r, outs);
+ reply->set_tid(m->get_tid());
+ reply->set_data(outbl);
+ m->get_connection()->send_message(reply);
+ }
+
+ if (run_after) {
+ run_after->complete(0);
+ }
+
+ m->put();
+}
+
+
+struct MDSCommand {
+ string cmdstring;
+ string helpstring;
+ string module;
+ string perm;
+ string availability;
+} mds_commands[] = {
+
+#define COMMAND(parsesig, helptext, module, perm, availability) \
+ {parsesig, helptext, module, perm, availability},
+
+COMMAND("injectargs " \
+ "name=injected_args,type=CephString,n=N",
+ "inject configuration arguments into running MDS",
+ "mds", "*", "cli,rest")
+COMMAND("exit",
+ "Terminate this MDS",
+ "mds", "*", "cli,rest")
+COMMAND("respawn",
+ "Restart this MDS",
+ "mds", "*", "cli,rest")
+COMMAND("session kill " \
+ "name=session_id,type=CephInt",
+ "End a client session",
+ "mds", "*", "cli,rest")
+COMMAND("cpu_profiler " \
+ "name=arg,type=CephChoices,strings=status|flush",
+ "run cpu profiling on daemon", "mds", "rw", "cli,rest")
+COMMAND("heap " \
+ "name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \
+ "show heap usage info (available only if compiled with tcmalloc)", \
+ "mds", "*", "cli,rest")
+};
+
+// FIXME: reinstate dumpcache as an admin socket command
+// -- it makes no sense for it to be a remote command when
+// the output is a local file
+// FIXME: reinstate issue_caps, try_eval, fragment_dir, merge_dir, export_dir
+// *if* it makes sense to do so (or should these be admin socket things?)
/* This function DOES put the passed message before returning*/
void MDS::handle_command(MMonCommand *m)
{
- dout(10) << "handle_command args: " << m->cmd << dendl;
- if (m->cmd[0] == "injectargs") {
- if (m->cmd.size() < 2) {
+ bufferlist outbl;
+ std::string outs;
+ _handle_command_legacy(m->cmd);
+ m->put();
+}
+
+int MDS::_handle_command(
+ const cmdmap_t &cmdmap,
+ bufferlist const &inbl,
+ bufferlist *outbl,
+ std::string *outs,
+ Context **run_later)
+{
+ assert(outbl != NULL);
+ assert(outs != NULL);
+
+ class SuicideLater : public MDSInternalContext
+ {
+ public:
+
+ SuicideLater(MDS *mds) : MDSInternalContext(mds) {}
+ void finish(int r) {
+ // Wait a little to improve chances of caller getting
+ // our response before seeing us disappear from mdsmap
+ sleep(1);
+
+ mds->suicide();
+ }
+ };
+
+
+ class RespawnLater : public MDSInternalContext
+ {
+ public:
+
+ RespawnLater(MDS *mds) : MDSInternalContext(mds) {}
+ void finish(int r) {
+ // Wait a little to improve chances of caller getting
+ // our response before seeing us disappear from mdsmap
+ sleep(1);
+
+ mds->respawn();
+ }
+ };
+
+ std::stringstream ds;
+ std::stringstream ss;
+ std::string prefix;
+ cmd_getval(cct, cmdmap, "prefix", prefix);
+
+ int r = 0;
+
+ if (prefix == "get_command_descriptions") {
+ int cmdnum = 0;
+ JSONFormatter *f = new JSONFormatter();
+ f->open_object_section("command_descriptions");
+ for (MDSCommand *cp = mds_commands;
+ cp < &mds_commands[ARRAY_SIZE(mds_commands)]; cp++) {
+
+ ostringstream secname;
+ secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
+ dump_cmddesc_to_json(f, secname.str(), cp->cmdstring, cp->helpstring,
+ cp->module, cp->perm, cp->availability);
+ cmdnum++;
+ }
+ f->close_section(); // command_descriptions
+
+ f->flush(ds);
+ delete f;
+ } else if (prefix == "injectargs") {
+ vector<string> argsvec;
+ cmd_getval(cct, cmdmap, "injected_args", argsvec);
+
+ if (argsvec.empty()) {
+ r = -EINVAL;
+ ss << "ignoring empty injectargs";
+ goto out;
+ }
+ string args = argsvec.front();
+ for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a)
+ args += " " + *a;
+ cct->_conf->injectargs(args, &ss);
+ } else if (prefix == "exit") {
+ // We will send response before executing
+ ss << "Exiting...";
+ *run_later = new SuicideLater(this);
+ }
+ else if (prefix == "respawn") {
+ // We will send response before executing
+ ss << "Respawning...";
+ *run_later = new RespawnLater(this);
+ } else if (prefix == "session kill") {
+ // FIXME harmonize `session kill` with admin socket session evict
+ int64_t session_id = 0;
+ bool got = cmd_getval(cct, cmdmap, "session_id", session_id);
+ assert(got);
+ Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
+
+ if (session) {
+ server->kill_session(session, NULL);
+ } else {
+ r = -ENOENT;
+ ss << "session '" << session_id << "' not found";
+ }
+ } else if (prefix == "heap") {
+ if (!ceph_using_tcmalloc()) {
+ r = -EOPNOTSUPP;
+ ss << "could not issue heap profiler command -- not using tcmalloc!";
+ } else {
+ string heapcmd;
+ cmd_getval(cct, cmdmap, "heapcmd", heapcmd);
+ vector<string> heapcmd_vec;
+ get_str_vec(heapcmd, heapcmd_vec);
+ ceph_heap_profiler_handle_command(heapcmd_vec, ds);
+ }
+ } else if (prefix == "cpu_profiler") {
+ string arg;
+ cmd_getval(cct, cmdmap, "arg", arg);
+ vector<string> argvec;
+ get_str_vec(arg, argvec);
+ cpu_profiler_handle_command(argvec, ds);
+ } else {
+ std::ostringstream ss;
+ ss << "unrecognized command! " << prefix;
+ r = -EINVAL;
+ }
+
+out:
+ *outs = ss.str();
+ outbl->append(ds);
+ return r;
+}
+
+/**
+ * Legacy "mds tell", takes a simple array of args
+ */
+int MDS::_handle_command_legacy(std::vector<std::string> args)
+{
+ dout(10) << "handle_command args: " << args << dendl;
+ if (args[0] == "injectargs") {
+ if (args.size() < 2) {
derr << "Ignoring empty injectargs!" << dendl;
}
else {
std::ostringstream oss;
mds_lock.Unlock();
- g_conf->injectargs(m->cmd[1], &oss);
+ g_conf->injectargs(args[1], &oss);
mds_lock.Lock();
derr << "injectargs:" << dendl;
derr << oss.str() << dendl;
}
}
- else if (m->cmd[0] == "dumpcache") {
- if (m->cmd.size() > 1)
- mdcache->dump_cache(m->cmd[1].c_str());
+ else if (args[0] == "dumpcache") {
+ if (args.size() > 1)
+ mdcache->dump_cache(args[1].c_str());
else
mdcache->dump_cache();
}
- else if (m->cmd[0] == "exit") {
+ else if (args[0] == "exit") {
suicide();
}
- else if (m->cmd[0] == "respawn") {
+ else if (args[0] == "respawn") {
respawn();
}
- else if (m->cmd[0] == "session" && m->cmd[1] == "kill") {
+ else if (args[0] == "session" && args[1] == "kill") {
Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
- strtol(m->cmd[2].c_str(), 0, 10)));
+ strtol(args[2].c_str(), 0, 10)));
if (session)
server->kill_session(session, NULL);
else
dout(15) << "session " << session << " not in sessionmap!" << dendl;
- } else if (m->cmd[0] == "issue_caps") {
- long inum = strtol(m->cmd[1].c_str(), 0, 10);
+ } else if (args[0] == "issue_caps") {
+ long inum = strtol(args[1].c_str(), 0, 10);
CInode *in = mdcache->get_inode(inodeno_t(inum));
if (in) {
bool r = locker->issue_caps(in);
dout(20) << "called issue_caps on inode " << inum
<< " with result " << r << dendl;
} else dout(15) << "inode " << inum << " not in mdcache!" << dendl;
- } else if (m->cmd[0] == "try_eval") {
- long inum = strtol(m->cmd[1].c_str(), 0, 10);
- int mask = strtol(m->cmd[2].c_str(), 0, 10);
+ } else if (args[0] == "try_eval") {
+ long inum = strtol(args[1].c_str(), 0, 10);
+ int mask = strtol(args[2].c_str(), 0, 10);
CInode * ino = mdcache->get_inode(inodeno_t(inum));
if (ino) {
locker->try_eval(ino, mask);
dout(20) << "try_eval(" << inum << ", " << mask << ")" << dendl;
} else dout(15) << "inode " << inum << " not in mdcache!" << dendl;
- } else if (m->cmd[0] == "fragment_dir") {
- if (m->cmd.size() == 4) {
- filepath fp(m->cmd[1].c_str());
+ } else if (args[0] == "fragment_dir") {
+ if (args.size() == 4) {
+ filepath fp(args[1].c_str());
CInode *in = mdcache->cache_traverse(fp);
if (in) {
frag_t fg;
- if (fg.parse(m->cmd[2].c_str())) {
+ if (fg.parse(args[2].c_str())) {
CDir *dir = in->get_dirfrag(fg);
if (dir) {
if (dir->is_auth()) {
- int by = atoi(m->cmd[3].c_str());
+ int by = atoi(args[3].c_str());
if (by)
mdcache->split_dir(dir, by);
else
dout(0) << "need to split by >0 bits" << dendl;
} else dout(0) << "dir " << dir->dirfrag() << " not auth" << dendl;
} else dout(0) << "dir " << in->ino() << " " << fg << " dne" << dendl;
- } else dout(0) << " frag " << m->cmd[2] << " does not parse" << dendl;
+ } else dout(0) << " frag " << args[2] << " does not parse" << dendl;
} else dout(0) << "path " << fp << " not found" << dendl;
} else dout(0) << "bad syntax" << dendl;
- } else if (m->cmd[0] == "merge_dir") {
- if (m->cmd.size() == 3) {
- filepath fp(m->cmd[1].c_str());
+ } else if (args[0] == "merge_dir") {
+ if (args.size() == 3) {
+ filepath fp(args[1].c_str());
CInode *in = mdcache->cache_traverse(fp);
if (in) {
frag_t fg;
- if (fg.parse(m->cmd[2].c_str())) {
+ if (fg.parse(args[2].c_str())) {
mdcache->merge_dir(in, fg);
- } else dout(0) << " frag " << m->cmd[2] << " does not parse" << dendl;
+ } else dout(0) << " frag " << args[2] << " does not parse" << dendl;
} else dout(0) << "path " << fp << " not found" << dendl;
} else dout(0) << "bad syntax" << dendl;
- } else if (m->cmd[0] == "export_dir") {
- if (m->cmd.size() == 3) {
- filepath fp(m->cmd[1].c_str());
- mds_rank_t target = mds_rank_t(atoi(m->cmd[2].c_str()));
+ } else if (args[0] == "export_dir") {
+ if (args.size() == 3) {
+ filepath fp(args[1].c_str());
+ mds_rank_t target = mds_rank_t(atoi(args[2].c_str()));
if (target != whoami && mdsmap->is_up(target) && mdsmap->is_in(target)) {
CInode *in = mdcache->cache_traverse(fp);
if (in) {
} else dout(0) << "bad export_dir target syntax" << dendl;
} else dout(0) << "bad export_dir syntax" << dendl;
}
- else if (m->cmd[0] == "cpu_profiler") {
+ else if (args[0] == "cpu_profiler") {
ostringstream ss;
- cpu_profiler_handle_command(m->cmd, ss);
+ cpu_profiler_handle_command(args, ss);
clog->info() << ss.str();
}
- else if (m->cmd[0] == "heap") {
- if (!ceph_using_tcmalloc())
- clog->info() << "tcmalloc not enabled, can't use heap profiler commands\n";
- else {
- ostringstream ss;
- vector<std::string> cmdargs;
- cmdargs.insert(cmdargs.begin(), m->cmd.begin()+1, m->cmd.end());
- ceph_heap_profiler_handle_command(cmdargs, ss);
- clog->info() << ss.str();
- }
- } else dout(0) << "unrecognized command! " << m->cmd << dendl;
- m->put();
+ else if (args[0] == "heap") {
+ if (!ceph_using_tcmalloc())
+ clog->info() << "tcmalloc not enabled, can't use heap profiler commands\n";
+ else {
+ ostringstream ss;
+ vector<std::string> cmdargs;
+ cmdargs.insert(cmdargs.begin(), args.begin()+1, args.end());
+ ceph_heap_profiler_handle_command(cmdargs, ss);
+ clog->info() << ss.str();
+ }
+ } else {
+ dout(0) << "unrecognized command! " << args << dendl;
+ }
+
+ return 0;
}
/* This function deletes the passed message before returning. */
break;
// OSD
+ case MSG_COMMAND:
+ handle_command(static_cast<MCommand*>(m));
+ break;
case CEPH_MSG_OSD_MAP:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
if (is_active() && snapserver)