osd_compat(get_osd_compat_set()),
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
get_num_op_threads()),
- command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
heartbeat_stop(false),
heartbeat_need_update(true),
hb_front_client_messenger(hb_client_front),
up_thru_wanted(0),
requested_full_first(0),
requested_full_last(0),
- command_wq(
- this,
- cct->_conf->osd_command_thread_timeout,
- cct->_conf->osd_command_thread_suicide_timeout,
- &command_tp),
service(this)
{
}
osd_op_tp.start();
- command_tp.start();
// start the heartbeat
heartbeat_thread.create("osd_srv_heartbt");
osd_op_tp.stop();
dout(10) << "op sharded tp stopped" << dendl;
- command_tp.drain();
- command_tp.stop();
- dout(10) << "command tp stopped" << dendl;
-
dout(10) << "stopping agent" << dendl;
service.agent_stop();
m->put();
}
-struct OSDCommand {
- string cmdstring;
- string helpstring;
- string module;
- string perm;
-} osd_commands[] = {
-
-#define COMMAND(parsesig, helptext, module, perm) \
- {parsesig, helptext, module, perm},
-
-COMMAND("perf histogram dump "
- "name=logger,type=CephString,req=false "
- "name=counter,type=CephString,req=false",
- "Get histogram data",
- "osd", "r")
-
-// tell <osd.n> commands. Validation of osd.n must be special-cased in client
-COMMAND("version", "report version of OSD", "osd", "r")
-COMMAND("get_command_descriptions", "list commands descriptions", "osd", "r")
-COMMAND("injectargs " \
- "name=injected_args,type=CephString,n=N",
- "inject configuration arguments into running OSD",
- "osd", "rw")
-COMMAND("config set " \
- "name=key,type=CephString name=value,type=CephString",
- "Set a configuration option at runtime (not persistent)",
- "osd", "rw")
-COMMAND("config get " \
- "name=key,type=CephString",
- "Get a configuration option at runtime",
- "osd", "r")
-COMMAND("config unset " \
- "name=key,type=CephString",
- "Unset a configuration option at runtime (not persistent)",
- "osd", "rw")
-COMMAND("smart name=devid,type=CephString,req=false",
- "runs smartctl on this osd devices. ",
- "osd", "rw")
-};
-
-void OSD::do_command(
- Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data)
-{
- dout(20) << "do_command tid " << tid << " " << cmd << dendl;
-
- int r = 0;
- stringstream ss, ds;
- bufferlist odata;
- cmdmap_t cmdmap;
- if (cmd.empty()) {
- ss << "no command given";
- goto out;
- }
- if (!cmdmap_from_json(cmd, &cmdmap, ss)) {
- r = -EINVAL;
- goto out;
- }
-
- try {
- r = _do_command(con, cmdmap, tid, data, odata, ss, ds);
- } catch (const bad_cmd_get& e) {
- r = -EINVAL;
- ss << e.what();
- }
- if (r == -EAGAIN) {
- return;
- }
- out:
- string rs = ss.str();
- odata.append(ds);
- dout(0) << "do_command r=" << r << " " << rs << dendl;
- clog->info() << rs;
- if (con) {
- MCommandReply *reply = new MCommandReply(r, rs);
- reply->set_tid(tid);
- reply->set_data(odata);
- con->send_message(reply);
- }
-}
-
namespace {
class unlock_guard {
ceph::mutex& m;
};
}
-int OSD::_do_command(
- Connection *con, cmdmap_t& cmdmap, ceph_tid_t tid, bufferlist& data,
- bufferlist& odata, stringstream& ss, stringstream& ds)
-{
- int r = 0;
- string prefix;
- string format;
- string pgidstr;
- boost::scoped_ptr<Formatter> f;
-
- cmd_getval(cct, cmdmap, "prefix", prefix);
-
- if (prefix == "get_command_descriptions") {
- int cmdnum = 0;
- JSONFormatter *f = new JSONFormatter();
- f->open_object_section("command_descriptions");
- for (OSDCommand *cp = osd_commands;
- cp < &osd_commands[std::size(osd_commands)]; cp++) {
-
- ostringstream secname;
- secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
- dump_cmddesc_to_json(f, con->get_features(),
- secname.str(), cp->cmdstring, cp->helpstring,
- cp->module, cp->perm, 0);
- cmdnum++;
- }
- f->close_section(); // command_descriptions
-
- f->flush(ds);
- delete f;
- goto out;
- }
-
- cmd_getval(cct, cmdmap, "format", format);
- f.reset(Formatter::create(format));
-
- if (prefix == "version") {
- if (f) {
- f->open_object_section("version");
- f->dump_string("version", pretty_version_to_str());
- f->close_section();
- f->flush(ds);
- } else {
- ds << pretty_version_to_str();
- }
- goto out;
- }
- else if (prefix == "injectargs") {
- vector<string> argsvec;
- cmd_getval(cct, cmdmap, "injected_args", argsvec);
-
- if (argsvec.empty()) {
- r = -EINVAL;
- ss << "ignoring empty injectargs";
- goto out;
- }
- string args = argsvec.front();
- for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a)
- args += " " + *a;
- unlock_guard unlock{osd_lock};
- r = cct->_conf.injectargs(args, &ss);
- }
- else if (prefix == "config set") {
- std::string key;
- std::string val;
- cmd_getval(cct, cmdmap, "key", key);
- cmd_getval(cct, cmdmap, "value", val);
- unlock_guard unlock{osd_lock};
- r = cct->_conf.set_val(key, val, &ss);
- if (r == 0) {
- cct->_conf.apply_changes(nullptr);
- }
- }
- else if (prefix == "config get") {
- std::string key;
- cmd_getval(cct, cmdmap, "key", key);
- unlock_guard unlock{osd_lock};
- std::string val;
- r = cct->_conf.get_val(key, &val);
- if (r == 0) {
- ds << val;
- }
- }
- else if (prefix == "config unset") {
- std::string key;
- cmd_getval(cct, cmdmap, "key", key);
- unlock_guard unlock{osd_lock};
- r = cct->_conf.rm_val(key);
- if (r == 0) {
- cct->_conf.apply_changes(nullptr);
- }
- if (r == -ENOENT) {
- r = 0; // make command idempotent
- }
- }
-
- else if (prefix == "smart") {
- string devid;
- cmd_getval(cct, cmdmap, "devid", devid);
- probe_smart(devid, ds);
- }
-
-
- else {
- ss << "unrecognized command '" << prefix << "'";
- r = -ENOSYS;
- }
-
- out:
- return r;
-}
-
void OSD::scrub_purged_snaps()
{
dout(10) << __func__ << dendl;
dout(10) << "ping from " << m->get_source() << dendl;
m->put();
return;
- case MSG_MON_COMMAND:
- handle_command(static_cast<MMonCommand*>(m));
- return;
case MSG_OSD_FORCE_RECOVERY:
handle_fast_force_recovery(static_cast<MOSDForceRecovery*>(m));
return;
private:
ShardedThreadPool osd_op_tp;
- ThreadPool command_tp;
void get_latest_osdmap();
void handle_fast_force_recovery(MOSDForceRecovery *m);
// -- commands --
- struct Command {
- vector<string> cmd;
- ceph_tid_t tid;
- bufferlist indata;
- ConnectionRef con;
-
- Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
- : cmd(c), tid(t), indata(bl), con(co) {}
- };
- list<Command*> command_queue;
- struct CommandWQ : public ThreadPool::WorkQueue<Command> {
- OSD *osd;
- CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
-
- bool _empty() override {
- return osd->command_queue.empty();
- }
- bool _enqueue(Command *c) override {
- osd->command_queue.push_back(c);
- return true;
- }
- void _dequeue(Command *pg) override {
- ceph_abort();
- }
- Command *_dequeue() override {
- if (osd->command_queue.empty())
- return NULL;
- Command *c = osd->command_queue.front();
- osd->command_queue.pop_front();
- return c;
- }
- void _process(Command *c, ThreadPool::TPHandle &) override {
- osd->osd_lock.lock();
- if (osd->is_stopping()) {
- osd->osd_lock.unlock();
- delete c;
- return;
- }
- osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
- osd->osd_lock.unlock();
- delete c;
- }
- void _clear() override {
- while (!osd->command_queue.empty()) {
- Command *c = osd->command_queue.front();
- osd->command_queue.pop_front();
- delete c;
- }
- }
- } command_wq;
-
void handle_command(class MCommand *m);
- void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
- int _do_command(
- Connection *con, cmdmap_t& cmdmap, ceph_tid_t tid, bufferlist& data,
- bufferlist& odata, stringstream& ss, stringstream& ds);
// -- pg recovery --