]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr: improve handle_command
authorJohn Spray <john.spray@redhat.com>
Sun, 5 Mar 2017 15:38:36 +0000 (10:38 -0500)
committerKefu Chai <kchai@redhat.com>
Sun, 30 Apr 2017 02:21:30 +0000 (10:21 +0800)
Run the python module calls in a finisher so that
they don't block the daemonserver lock and so that
they can call back into mgr stuff if they need to.

Fix passing through commands to python modules, this
was giving EINVAL because only things with a MgrCommand
were getting let in.

Also fix get_command_descriptions, which was not
including the output of the formatter in the response.

Signed-off-by: John Spray <john.spray@redhat.com>
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/Mgr.cc

index fe0729d7484e936f983459c01928e2e20ac9481f..bb1927e25d6fa7e1405920f5061ffd6f45ec8c2f 100644 (file)
@@ -30,6 +30,7 @@
 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
 
 DaemonServer::DaemonServer(MonClient *monc_,
+                           Finisher &finisher_,
                           DaemonStateIndex &daemon_state_,
                           ClusterState &cluster_state_,
                           PyModules &py_modules_,
@@ -54,6 +55,7 @@ DaemonServer::DaemonServer(MonClient *monc_,
                                     g_conf->mgr_mon_messages)),
       msgr(nullptr),
       monc(monc_),
+      finisher(finisher_),
       daemon_state(daemon_state_),
       cluster_state(cluster_state_),
       py_modules(py_modules_),
@@ -371,40 +373,82 @@ bool DaemonServer::_allowed_command(
   return capable;
 }
 
-class ReplyOnFinish : public Context {
-  DaemonServer* mgr;
-  MCommand *m;
-  bufferlist odata;
-
-public:
-  bufferlist from_mon;
-  string outs;
-
-  ReplyOnFinish(DaemonServer* mgr, MCommand *m, bufferlist&& odata)
-    : mgr(mgr), m(m), odata(std::move(odata))
-  {}
-  void finish(int r) override {
-    odata.claim_append(from_mon);
-    mgr->_reply(m, r, outs, odata);
-  }
-};
-
 bool DaemonServer::handle_command(MCommand *m)
 {
   int r = 0;
   std::stringstream ss;
-  bufferlist odata;
   std::string prefix;
 
   assert(lock.is_locked_by_me());
 
-  cmdmap_t cmdmap;
+  /**
+   * The working data for processing an MCommand.  This lives in
+   * a class to enable passing it into other threads for processing
+   * outside of the thread/locks that called handle_command.
+   */
+  class CommandContext
+  {
+    public:
+    MCommand *m;
+    bufferlist odata;
+    cmdmap_t cmdmap;
+
+    CommandContext(MCommand *m_)
+      : m(m_)
+    {
+    }
+
+    ~CommandContext()
+    {
+      m->put();
+    }
+
+    void reply(int r, const std::stringstream &ss)
+    {
+      reply(r, ss.str());
+    }
+
+    void reply(int r, const std::string &rs)
+    {
+      // Let the connection drop as soon as we've sent our response
+      ConnectionRef con = m->get_connection();
+      if (con) {
+        con->mark_disposable();
+      }
+
+      dout(1) << "do_command r=" << r << " " << rs << dendl;
+      if (con) {
+        MCommandReply *reply = new MCommandReply(r, rs);
+        reply->set_tid(m->get_tid());
+        reply->set_data(odata);
+        con->send_message(reply);
+      }
+    }
+  };
+
+  /**
+   * A context for receiving a bufferlist/error string from a background
+   * function and then calling back to a CommandContext when it's done
+   */
+  class ReplyOnFinish : public Context {
+    std::shared_ptr<CommandContext> cmdctx;
+
+  public:
+    bufferlist from_mon;
+    string outs;
+
+    ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
+      : cmdctx(cmdctx_)
+    {}
+    void finish(int r) override {
+      cmdctx->odata.claim_append(from_mon);
+      cmdctx->reply(r, outs);
+    }
+  };
 
-  // TODO background the call into python land so that we don't
-  // block a messenger thread on python code.
+  std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
 
-  ConnectionRef con = m->get_connection();
-  MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
+  MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
   if (!session) {
     return true;
   }
@@ -412,23 +456,23 @@ bool DaemonServer::handle_command(MCommand *m)
   if (session->inst.name == entity_name_t())
     session->inst.name = m->get_source();
 
-  string format;
+  std::string format;
   boost::scoped_ptr<Formatter> f;
-  const MgrCommand *mgr_cmd;
   map<string,string> param_str_map;
 
-  if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
-    return _reply(m, -EINVAL, ss.str(), odata);
+  if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
+    cmdctx->reply(-EINVAL, ss);
+    return true;
   }
 
   {
-    cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
+    cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
     f.reset(Formatter::create(format));
   }
 
-  dout(4) << "decoded " << cmdmap.size() << dendl;
-  cmd_getval(cct, cmdmap, "prefix", prefix);
+  cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
 
+  dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
   dout(4) << "prefix=" << prefix << dendl;
 
   if (prefix == "get_command_descriptions") {
@@ -460,26 +504,36 @@ bool DaemonServer::handle_command(MCommand *m)
     }
 #endif
     f.close_section(); // command_descriptions
-    f.flush(odata);
-    return _reply(m, r, ss.str(), odata);
+    f.flush(cmdctx->odata);
+    cmdctx->reply(0, ss);
+    return true;
   }
 
   // lookup command
-  mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
+  const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
                                               ARRAY_SIZE(mgr_commands));
-  _generate_command_map(cmdmap, param_str_map);
+  _generate_command_map(cmdctx->cmdmap, param_str_map);
   if (!mgr_cmd) {
-    return _reply(m, -EINVAL, "command not supported", odata);
-  }
-
-  // validate user's permissions for requested command
-  if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdmap,
-                        param_str_map, mgr_cmd)) {
-    dout(1) << __func__ << " access denied" << dendl;
-    audit_clog->info() << "from='" << session->inst << "' "
-                      << "entity='" << session->entity_name << "' "
-                      << "cmd=" << m->cmd << ":  access denied";
-    return _reply(m, -EACCES, "access denied", odata);
+    MgrCommand py_command = {"", "", "py", "rw", "cli"};
+    if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
+                          param_str_map, &py_command)) {
+      dout(1) << " access denied" << dendl;
+      ss << "access denied";
+      cmdctx->reply(-EACCES, ss);
+      return true;
+    }
+  } else {
+    // validate user's permissions for requested command
+    if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
+                          param_str_map, mgr_cmd)) {
+      dout(1) << " access denied" << dendl;
+      audit_clog->info() << "from='" << session->inst << "' "
+                         << "entity='" << session->entity_name << "' "
+                         << "cmd=" << m->cmd << ":  access denied";
+      ss << "access denied";
+      cmdctx->reply(-EACCES, ss);
+      return true;
+    }
   }
 
   audit_clog->debug()
@@ -496,10 +550,11 @@ bool DaemonServer::handle_command(MCommand *m)
     string scrubop = prefix.substr(3, string::npos);
     pg_t pgid;
     string pgidstr;
-    cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
+    cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
     if (!pgid.parse(pgidstr.c_str())) {
       ss << "invalid pgid '" << pgidstr << "'";
-      return _reply(m, -EINVAL, ss.str(), odata);
+      cmdctx->reply(-EINVAL, ss);
+      return true;
     }
     bool pg_exists = false;
     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
@@ -507,7 +562,8 @@ bool DaemonServer::handle_command(MCommand *m)
       });
     if (!pg_exists) {
       ss << "pg " << pgid << " dne";
-      return _reply(m, -ENOENT, ss.str(), odata);
+      cmdctx->reply(-ENOENT, ss);
+      return true;
     }
     int acting_primary = -1;
     entity_inst_t inst;
@@ -519,7 +575,8 @@ bool DaemonServer::handle_command(MCommand *m)
       });
     if (acting_primary == -1) {
       ss << "pg " << pgid << " has no primary osd";
-      return _reply(m, -EAGAIN, ss.str(), odata);
+      cmdctx->reply(-EAGAIN, ss);
+      return true;
     }
     vector<pg_t> pgs = { pgid };
     msgr->send_message(new MOSDScrub(monc->get_fsid(),
@@ -529,7 +586,8 @@ bool DaemonServer::handle_command(MCommand *m)
                       inst);
     ss << "instructing pg " << pgid << " on osd." << acting_primary
        << " (" << inst << ") to " << scrubop;
-    return _reply(m, 0, ss.str(), odata);
+    cmdctx->reply(0, ss);
+    return true;
   } else if (prefix == "osd reweight-by-pg" ||
             prefix == "osd reweight-by-utilization" ||
             prefix == "osd test-reweight-by-pg" ||
@@ -540,10 +598,10 @@ bool DaemonServer::handle_command(MCommand *m)
       prefix == "osd test-reweight-by-pg" ||
       prefix == "osd test-reweight-by-utilization";
     int64_t oload;
-    cmd_getval(g_ceph_context, cmdmap, "oload", oload, int64_t(120));
+    cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
     set<int64_t> pools;
     vector<string> poolnames;
-    cmd_getval(g_ceph_context, cmdmap, "pools", poolnames);
+    cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
        for (const auto& poolname : poolnames) {
          int64_t pool = osdmap.lookup_pg_pool_name(poolname);
@@ -555,22 +613,25 @@ bool DaemonServer::handle_command(MCommand *m)
        }
       });
     if (r) {
-      return _reply(m, r, ss.str(), odata);
+      cmdctx->reply(r, ss);
+      return true;
     }
     double max_change = g_conf->mon_reweight_max_change;
-    cmd_getval(g_ceph_context, cmdmap, "max_change", max_change);
+    cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
     if (max_change <= 0.0) {
       ss << "max_change " << max_change << " must be positive";
-      return _reply(m, -EINVAL, ss.str(), odata);
+      cmdctx->reply(-EINVAL, ss);
+      return true;
     }
     int64_t max_osds = g_conf->mon_reweight_max_osds;
-    cmd_getval(g_ceph_context, cmdmap, "max_osds", max_osds);
+    cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
     if (max_osds <= 0) {
       ss << "max_osds " << max_osds << " must be positive";
-      return _reply(m, -EINVAL, ss.str(), odata);
+      cmdctx->reply(-EINVAL, ss);
+      return true;
     }
     string no_increasing;
-    cmd_getval(g_ceph_context, cmdmap, "no_increasing", no_increasing);
+    cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
     string out_str;
     mempool::osdmap::map<int32_t, uint32_t> new_weights;
     r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
@@ -589,16 +650,19 @@ bool DaemonServer::handle_command(MCommand *m)
     if (r >= 0) {
       dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
     }
-    if (f)
-      f->flush(odata);
-    else
-      odata.append(out_str);
+    if (f) {
+      f->flush(cmdctx->odata);
+    } else {
+      cmdctx->odata.append(out_str);
+    }
     if (r < 0) {
       ss << "FAILED reweight-by-pg";
-      return _reply(m, r, ss.str(), odata);
+      cmdctx->reply(r, ss);
+      return true;
     } else if (r == 0 || dry_run) {
       ss << "no change";
-      return _reply(m, r, ss.str(), odata);
+      cmdctx->reply(r, ss);
+      return true;
     } else {
       json_spirit::Object json_object;
       for (const auto& osd_weight : new_weights) {
@@ -613,7 +677,7 @@ bool DaemonServer::handle_command(MCommand *m)
        "\"prefix\": \"osd reweightn\", "
        "\"weights\": \"" + s + "\""
        "}";
-      auto on_finish = new ReplyOnFinish(this, m, std::move(odata));
+      auto on_finish = new ReplyOnFinish(cmdctx);
       monc->start_mon_command({cmd}, {},
                              &on_finish->from_mon, &on_finish->outs, on_finish);
       return true;
@@ -621,63 +685,46 @@ bool DaemonServer::handle_command(MCommand *m)
   } else {
     r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
        return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
-           return process_pg_map_command(prefix, cmdmap, pg_map, osdmap,
-                                         f.get(), &ss, &odata);
+           return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
+                                         f.get(), &ss, &cmdctx->odata);
          });
       });
-  }
-  if (r != -EOPNOTSUPP)
-      return _reply(m, r, ss.str(), odata);
-  // fall back to registered python handlers
-  else {
-    // Let's find you a handler!
-    MgrPyModule *handler = nullptr;
-    auto py_commands = py_modules.get_commands();
-    for (const auto &pyc : py_commands) {
-      auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
-      dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
-      if (pyc_prefix == prefix) {
-        handler = pyc.handler;
-        break;
-      }
-    }
 
-    if (handler == nullptr) {
-      ss << "No handler found for '" << prefix << "'";
-      dout(4) << "No handler found for '" << prefix << "'" << dendl;
-      return _reply(m, -EINVAL, ss.str(), odata);
+    if (r != -EOPNOTSUPP) {
+      cmdctx->reply(r, ss);
+      return true;
     }
+  }
 
-    // FIXME: go run this python part in another thread, not inline
-    // with a ms_dispatch, so that the python part can block if it
-    // wants to.
-    dout(4) << "passing through " << cmdmap.size() << dendl;
-    stringstream ds;
-    r = handler->handle_command(cmdmap, &ds, &ss);
-    odata.append(ds);
-    return _reply(m, 0, ss.str(), odata);
+  // None of the special native commands, 
+  MgrPyModule *handler = nullptr;
+  auto py_commands = py_modules.get_commands();
+  for (const auto &pyc : py_commands) {
+    auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
+    dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
+    if (pyc_prefix == prefix) {
+      handler = pyc.handler;
+      break;
+    }
   }
-}
 
-bool DaemonServer::_reply(MCommand* m,
-                         int ret,
-                         const std::string& s,
-                         const bufferlist& payload)
-{
-  dout(1) << __func__ << " r=" << ret << " " << s << dendl;
-  auto con = m->get_connection();
-  if (!con) {
-    dout(10) << __func__ << " connection dropped for command" << dendl;
-    m->put();
+  if (handler == nullptr) {
+    ss << "No handler found for '" << prefix << "'";
+    dout(4) << "No handler found for '" << prefix << "'" << dendl;
+    cmdctx->reply(-EINVAL, ss);
+    return true;
+  } else {
+    // Okay, now we have a handler to call, but we must not call it
+    // in this thread, because the python handlers can do anything,
+    // including blocking, and including calling back into mgr.
+    dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
+    finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
+      std::stringstream ds;
+      std::stringstream ss;
+      int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
+      cmdctx->odata.append(ds);
+      cmdctx->reply(r, ss);
+    }));
     return true;
   }
-  // Let the connection drop as soon as we've sent our response
-  con->mark_disposable();
-
-  auto response = new MCommandReply(ret, s);
-  response->set_tid(m->get_tid());
-  response->set_data(payload);
-  con->send_message(response);
-  m->put();
-  return true;
 }
index af6d383ba506018f6b47c5fc2d0ec82d28cf121a..0ef4a5655ee14739b6dbfaa8fc8773e9e26b6eec 100644 (file)
@@ -54,6 +54,7 @@ protected:
 
   Messenger *msgr;
   MonClient *monc;
+  Finisher  &finisher;
   DaemonStateIndex &daemon_state;
   ClusterState &cluster_state;
   PyModules &py_modules;
@@ -85,6 +86,7 @@ public:
   entity_addr_t get_myaddr() const;
 
   DaemonServer(MonClient *monc_,
+               Finisher &finisher_,
               DaemonStateIndex &daemon_state_,
               ClusterState &cluster_state_,
               PyModules &py_modules_,
index a9dd325c4a69717c7607e0f62d1b2f89e9b5ca8b..f507a3526663782690fe4e229b23c2ce937e777a 100644 (file)
@@ -48,7 +48,8 @@ Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_,
   finisher(g_ceph_context, "Mgr", "mgr-fin"),
   py_modules(daemon_state, cluster_state, *monc, finisher),
   cluster_state(monc, nullptr),
-  server(monc, daemon_state, cluster_state, py_modules, clog_, audit_clog_),
+  server(monc, finisher, daemon_state, cluster_state, py_modules,
+         clog_, audit_clog_),
   initialized(false),
   initializing(false)
 {