#define dout_prefix *_dout << "mgr.server " << __func__ << " "
DaemonServer::DaemonServer(MonClient *monc_,
+ Finisher &finisher_,
DaemonStateIndex &daemon_state_,
ClusterState &cluster_state_,
PyModules &py_modules_,
g_conf->mgr_mon_messages)),
msgr(nullptr),
monc(monc_),
+ finisher(finisher_),
daemon_state(daemon_state_),
cluster_state(cluster_state_),
py_modules(py_modules_),
return capable;
}
-class ReplyOnFinish : public Context {
- DaemonServer* mgr;
- MCommand *m;
- bufferlist odata;
-
-public:
- bufferlist from_mon;
- string outs;
-
- ReplyOnFinish(DaemonServer* mgr, MCommand *m, bufferlist&& odata)
- : mgr(mgr), m(m), odata(std::move(odata))
- {}
- void finish(int r) override {
- odata.claim_append(from_mon);
- mgr->_reply(m, r, outs, odata);
- }
-};
-
bool DaemonServer::handle_command(MCommand *m)
{
int r = 0;
std::stringstream ss;
- bufferlist odata;
std::string prefix;
assert(lock.is_locked_by_me());
- cmdmap_t cmdmap;
+ /**
+ * The working data for processing an MCommand. This lives in
+ * a class to enable passing it into other threads for processing
+ * outside of the thread/locks that called handle_command.
+ */
+ class CommandContext
+ {
+ public:
+ MCommand *m;
+ bufferlist odata;
+ cmdmap_t cmdmap;
+
+ CommandContext(MCommand *m_)
+ : m(m_)
+ {
+ }
+
+ ~CommandContext()
+ {
+ m->put();
+ }
+
+ void reply(int r, const std::stringstream &ss)
+ {
+ reply(r, ss.str());
+ }
+
+ void reply(int r, const std::string &rs)
+ {
+ // Let the connection drop as soon as we've sent our response
+ ConnectionRef con = m->get_connection();
+ if (con) {
+ con->mark_disposable();
+ }
+
+ dout(1) << "do_command r=" << r << " " << rs << dendl;
+ if (con) {
+ MCommandReply *reply = new MCommandReply(r, rs);
+ reply->set_tid(m->get_tid());
+ reply->set_data(odata);
+ con->send_message(reply);
+ }
+ }
+ };
+
+ /**
+ * A context for receiving a bufferlist/error string from a background
+ * function and then calling back to a CommandContext when it's done
+ */
+ class ReplyOnFinish : public Context {
+ std::shared_ptr<CommandContext> cmdctx;
+
+ public:
+ bufferlist from_mon;
+ string outs;
+
+ ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
+ : cmdctx(cmdctx_)
+ {}
+ void finish(int r) override {
+ cmdctx->odata.claim_append(from_mon);
+ cmdctx->reply(r, outs);
+ }
+ };
- // TODO background the call into python land so that we don't
- // block a messenger thread on python code.
+ std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
- ConnectionRef con = m->get_connection();
- MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
+ MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
if (!session) {
return true;
}
if (session->inst.name == entity_name_t())
session->inst.name = m->get_source();
- string format;
+ std::string format;
boost::scoped_ptr<Formatter> f;
- const MgrCommand *mgr_cmd;
map<string,string> param_str_map;
- if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
- return _reply(m, -EINVAL, ss.str(), odata);
+ if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
+ cmdctx->reply(-EINVAL, ss);
+ return true;
}
{
- cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
f.reset(Formatter::create(format));
}
- dout(4) << "decoded " << cmdmap.size() << dendl;
- cmd_getval(cct, cmdmap, "prefix", prefix);
+ cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
+ dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
dout(4) << "prefix=" << prefix << dendl;
if (prefix == "get_command_descriptions") {
}
#endif
f.close_section(); // command_descriptions
- f.flush(odata);
- return _reply(m, r, ss.str(), odata);
+ f.flush(cmdctx->odata);
+ cmdctx->reply(0, ss);
+ return true;
}
// lookup command
- mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
+ const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
ARRAY_SIZE(mgr_commands));
- _generate_command_map(cmdmap, param_str_map);
+ _generate_command_map(cmdctx->cmdmap, param_str_map);
if (!mgr_cmd) {
- return _reply(m, -EINVAL, "command not supported", odata);
- }
-
- // validate user's permissions for requested command
- if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdmap,
- param_str_map, mgr_cmd)) {
- dout(1) << __func__ << " access denied" << dendl;
- audit_clog->info() << "from='" << session->inst << "' "
- << "entity='" << session->entity_name << "' "
- << "cmd=" << m->cmd << ": access denied";
- return _reply(m, -EACCES, "access denied", odata);
+ MgrCommand py_command = {"", "", "py", "rw", "cli"};
+ if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
+ param_str_map, &py_command)) {
+ dout(1) << " access denied" << dendl;
+ ss << "access denied";
+ cmdctx->reply(-EACCES, ss);
+ return true;
+ }
+ } else {
+ // validate user's permissions for requested command
+ if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
+ param_str_map, mgr_cmd)) {
+ dout(1) << " access denied" << dendl;
+ audit_clog->info() << "from='" << session->inst << "' "
+ << "entity='" << session->entity_name << "' "
+ << "cmd=" << m->cmd << ": access denied";
+ ss << "access denied";
+ cmdctx->reply(-EACCES, ss);
+ return true;
+ }
}
audit_clog->debug()
string scrubop = prefix.substr(3, string::npos);
pg_t pgid;
string pgidstr;
- cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
if (!pgid.parse(pgidstr.c_str())) {
ss << "invalid pgid '" << pgidstr << "'";
- return _reply(m, -EINVAL, ss.str(), odata);
+ cmdctx->reply(-EINVAL, ss);
+ return true;
}
bool pg_exists = false;
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
});
if (!pg_exists) {
ss << "pg " << pgid << " dne";
- return _reply(m, -ENOENT, ss.str(), odata);
+ cmdctx->reply(-ENOENT, ss);
+ return true;
}
int acting_primary = -1;
entity_inst_t inst;
});
if (acting_primary == -1) {
ss << "pg " << pgid << " has no primary osd";
- return _reply(m, -EAGAIN, ss.str(), odata);
+ cmdctx->reply(-EAGAIN, ss);
+ return true;
}
vector<pg_t> pgs = { pgid };
msgr->send_message(new MOSDScrub(monc->get_fsid(),
inst);
ss << "instructing pg " << pgid << " on osd." << acting_primary
<< " (" << inst << ") to " << scrubop;
- return _reply(m, 0, ss.str(), odata);
+ cmdctx->reply(0, ss);
+ return true;
} else if (prefix == "osd reweight-by-pg" ||
prefix == "osd reweight-by-utilization" ||
prefix == "osd test-reweight-by-pg" ||
prefix == "osd test-reweight-by-pg" ||
prefix == "osd test-reweight-by-utilization";
int64_t oload;
- cmd_getval(g_ceph_context, cmdmap, "oload", oload, int64_t(120));
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
set<int64_t> pools;
vector<string> poolnames;
- cmd_getval(g_ceph_context, cmdmap, "pools", poolnames);
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
for (const auto& poolname : poolnames) {
int64_t pool = osdmap.lookup_pg_pool_name(poolname);
}
});
if (r) {
- return _reply(m, r, ss.str(), odata);
+ cmdctx->reply(r, ss);
+ return true;
}
double max_change = g_conf->mon_reweight_max_change;
- cmd_getval(g_ceph_context, cmdmap, "max_change", max_change);
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
if (max_change <= 0.0) {
ss << "max_change " << max_change << " must be positive";
- return _reply(m, -EINVAL, ss.str(), odata);
+ cmdctx->reply(-EINVAL, ss);
+ return true;
}
int64_t max_osds = g_conf->mon_reweight_max_osds;
- cmd_getval(g_ceph_context, cmdmap, "max_osds", max_osds);
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
if (max_osds <= 0) {
ss << "max_osds " << max_osds << " must be positive";
- return _reply(m, -EINVAL, ss.str(), odata);
+ cmdctx->reply(-EINVAL, ss);
+ return true;
}
string no_increasing;
- cmd_getval(g_ceph_context, cmdmap, "no_increasing", no_increasing);
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
string out_str;
mempool::osdmap::map<int32_t, uint32_t> new_weights;
r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
if (r >= 0) {
dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
}
- if (f)
- f->flush(odata);
- else
- odata.append(out_str);
+ if (f) {
+ f->flush(cmdctx->odata);
+ } else {
+ cmdctx->odata.append(out_str);
+ }
if (r < 0) {
ss << "FAILED reweight-by-pg";
- return _reply(m, r, ss.str(), odata);
+ cmdctx->reply(r, ss);
+ return true;
} else if (r == 0 || dry_run) {
ss << "no change";
- return _reply(m, r, ss.str(), odata);
+ cmdctx->reply(r, ss);
+ return true;
} else {
json_spirit::Object json_object;
for (const auto& osd_weight : new_weights) {
"\"prefix\": \"osd reweightn\", "
"\"weights\": \"" + s + "\""
"}";
- auto on_finish = new ReplyOnFinish(this, m, std::move(odata));
+ auto on_finish = new ReplyOnFinish(cmdctx);
monc->start_mon_command({cmd}, {},
&on_finish->from_mon, &on_finish->outs, on_finish);
return true;
} else {
r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
- return process_pg_map_command(prefix, cmdmap, pg_map, osdmap,
- f.get(), &ss, &odata);
+ return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
+ f.get(), &ss, &cmdctx->odata);
});
});
- }
- if (r != -EOPNOTSUPP)
- return _reply(m, r, ss.str(), odata);
- // fall back to registered python handlers
- else {
- // Let's find you a handler!
- MgrPyModule *handler = nullptr;
- auto py_commands = py_modules.get_commands();
- for (const auto &pyc : py_commands) {
- auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
- dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
- if (pyc_prefix == prefix) {
- handler = pyc.handler;
- break;
- }
- }
- if (handler == nullptr) {
- ss << "No handler found for '" << prefix << "'";
- dout(4) << "No handler found for '" << prefix << "'" << dendl;
- return _reply(m, -EINVAL, ss.str(), odata);
+ if (r != -EOPNOTSUPP) {
+ cmdctx->reply(r, ss);
+ return true;
}
+ }
- // FIXME: go run this python part in another thread, not inline
- // with a ms_dispatch, so that the python part can block if it
- // wants to.
- dout(4) << "passing through " << cmdmap.size() << dendl;
- stringstream ds;
- r = handler->handle_command(cmdmap, &ds, &ss);
- odata.append(ds);
- return _reply(m, 0, ss.str(), odata);
+ // None of the special native commands,
+ MgrPyModule *handler = nullptr;
+ auto py_commands = py_modules.get_commands();
+ for (const auto &pyc : py_commands) {
+ auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
+ dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
+ if (pyc_prefix == prefix) {
+ handler = pyc.handler;
+ break;
+ }
}
-}
-bool DaemonServer::_reply(MCommand* m,
- int ret,
- const std::string& s,
- const bufferlist& payload)
-{
- dout(1) << __func__ << " r=" << ret << " " << s << dendl;
- auto con = m->get_connection();
- if (!con) {
- dout(10) << __func__ << " connection dropped for command" << dendl;
- m->put();
+ if (handler == nullptr) {
+ ss << "No handler found for '" << prefix << "'";
+ dout(4) << "No handler found for '" << prefix << "'" << dendl;
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ } else {
+ // Okay, now we have a handler to call, but we must not call it
+ // in this thread, because the python handlers can do anything,
+ // including blocking, and including calling back into mgr.
+ dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
+ finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
+ std::stringstream ds;
+ std::stringstream ss;
+ int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
+ cmdctx->odata.append(ds);
+ cmdctx->reply(r, ss);
+ }));
return true;
}
- // Let the connection drop as soon as we've sent our response
- con->mark_disposable();
-
- auto response = new MCommandReply(ret, s);
- response->set_tid(m->get_tid());
- response->set_data(payload);
- con->send_message(response);
- m->put();
- return true;
}