]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: enable multiple python modules
authorJohn Spray <john.spray@redhat.com>
Mon, 4 Jul 2016 14:22:27 +0000 (15:22 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 29 Sep 2016 16:26:58 +0000 (17:26 +0100)
serve() each one in a separate thread, include a shutdown()
hook so that we can tear down cleanly.

Signed-off-by: John Spray <john.spray@redhat.com>
src/ceph_mgr.cc
src/mgr/Mgr.cc
src/mgr/Mgr.h
src/mgr/MgrPyModule.cc
src/mgr/MgrPyModule.h
src/mgr/PyModules.cc
src/mgr/PyModules.h

index 4a0fc758b7a9e818221c0267b6ebf7d4b7bb90f7..4b4c62a61b40cfed937e33ac3a9be29355f1cb74 100644 (file)
@@ -23,6 +23,8 @@
 #include "global/global_init.h"
 
 
+
+
 int main(int argc, const char **argv)
 {
   vector<const char*> args;
@@ -55,13 +57,6 @@ int main(int argc, const char **argv)
   }
 
   // Finally, execute the user's commands
-  rc = mgr.main(args);
-  if (rc != 0) {
-    std::cerr << "Error (" << cpp_strerror(rc) << ")" << std::endl;
-  }
-
-  mgr.shutdown();
-
-  return rc;
+  return mgr.main(args);
 }
 
index 35c6b075ba20d43050d113e71c0baabfed0497f3..751979993687ebf736aede9b2a2c3f6732d1f455 100644 (file)
@@ -15,6 +15,7 @@
 #include "mon/MonClient.h"
 #include "include/stringify.h"
 #include "global/global_context.h"
+#include "global/signal_handler.h"
 
 #include "mgr/MgrContext.h"
 
@@ -213,6 +214,8 @@ int Mgr::init()
 
   finisher.start();
 
+  py_modules.init();
+
   dout(4) << "Complete." << dendl;
   return 0;
 }
@@ -339,6 +342,12 @@ void Mgr::load_config()
   py_modules.insert_config(loaded);
 }
 
+void Mgr::handle_signal(int signum)
+{
+  Mutex::Locker l(lock);
+  assert(signum == SIGINT || signum == SIGTERM);
+  shutdown();
+}
 
 void Mgr::shutdown()
 {
@@ -349,14 +358,13 @@ void Mgr::shutdown()
   // to touch references to the things we're about to tear down
   finisher.stop();
 
-  lock.Lock();
+  //lock.Lock();
   timer.shutdown();
   objecter->shutdown();
-  lock.Unlock();
+  //lock.Unlock();
 
   monc->shutdown();
   client_messenger->shutdown();
-  client_messenger->wait();
 }
 
 void Mgr::handle_osd_map()
@@ -552,10 +560,33 @@ bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
   return *authorizer != NULL;
 }
 
+// A reference for use by the signal handler
+Mgr *signal_mgr = nullptr;
+
+static void handle_mgr_signal(int signum)
+{
+  if (signal_mgr) {
+    signal_mgr->handle_signal(signum);
+  }
+}
 
 int Mgr::main(vector<const char *> args)
 {
-  return py_modules.main(args);
+  py_modules.start();
+
+  // Enable signal handlers
+  signal_mgr = this;
+  init_async_signal_handler();
+  register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
+  register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
+
+  client_messenger->wait();
+
+  // Disable signal handlers
+  unregister_async_signal_handler(SIGINT, handle_mgr_signal);
+  unregister_async_signal_handler(SIGTERM, handle_mgr_signal);
+  shutdown_async_signal_handler();
+  signal_mgr = nullptr;
 }
 
 
index 4a2ce779c8ff8d2c57f66647b2aad7c157fb2b6f..e85358f06b7d63aa67ee1b87c89197991a221716 100644 (file)
@@ -81,6 +81,7 @@ public:
   void shutdown();
   void usage() {}
   int main(vector<const char *> args);
+  void handle_signal(int signum);
 };
 
 #endif /* MDS_UTILITY_H_ */
index c1dee9f1968c4da7a7e844985110039065004e4f..4e8d0d3bf64e59f8b661a498f5af17bb64b90249 100644 (file)
@@ -93,6 +93,27 @@ int MgrPyModule::serve()
   return 0;
 }
 
+// FIXME: DRY wrt serve
+void MgrPyModule::shutdown()
+{
+  assert(pClassInstance != nullptr);
+
+  PyGILState_STATE gstate;
+  gstate = PyGILState_Ensure();
+
+  auto pValue = PyObject_CallMethod(pClassInstance,
+      (const char*)"shutdown", (const char*)"()");
+
+  if (pValue != NULL) {
+    Py_DECREF(pValue);
+  } else {
+    PyErr_Print();
+    derr << "Failed to invoke shutdown() on " << module_name << dendl;
+  }
+
+  PyGILState_Release(gstate);
+}
+
 void MgrPyModule::notify(const std::string &notify_type, const std::string &notify_id)
 {
   assert(pClassInstance != nullptr);
index 8a5dc94edef788ff7012d1744b01e87246dcf65d..dd490d7e7402d38a3cb3505a0faab74a71e1f1f6 100644 (file)
@@ -46,7 +46,6 @@ private:
   PyObject *pClass;
   PyObject *pClassInstance;
 
-
   std::vector<ModuleCommand> commands;
 
   int load_commands();
@@ -57,6 +56,7 @@ public:
 
   int load();
   int serve();
+  void shutdown();
   void notify(const std::string &notify_type, const std::string &notify_id);
 
   const std::vector<ModuleCommand> &get_commands() const
@@ -64,6 +64,11 @@ public:
     return commands;
   }
 
+  const std::string &get_name() const
+  {
+    return module_name;
+  }
+
   int handle_command(
     const cmdmap_t &cmdmap,
     std::stringstream *ss,
index f4b25f72e1b24f13dfbbc3759d0b0be6cce77edf..40bc68f9cee6cb04ce391d4eee709ad976d44090 100644 (file)
@@ -12,6 +12,9 @@
  */
 
 
+#include <boost/tokenizer.hpp>
+#include "common/errno.h"
+
 #include "PyState.h"
 #include "PyFormatter.h"
 
@@ -212,8 +215,11 @@ std::string handle_pyerror()
     return extract<std::string>(formatted);
 }
 
-int PyModules::main(vector<const char *> args)
+
+int PyModules::init()
 {
+  Mutex::Locker locker(lock);
+
   global_handle = this;
 
   // Set up global python interpreter
@@ -252,53 +258,103 @@ int PyModules::main(vector<const char *> args)
   }
 
   // Load python code
-  // TODO load mgr_modules list, run them all in a thread each.
-  auto mod = new MgrPyModule("rest");
-  int r = mod->load();
-  if (r != 0) {
-    derr << "Error loading python module" << dendl;
-    derr << handle_pyerror() << dendl;
-#if 0
-    PyObject *ptype, *pvalue, *ptraceback;
-    PyErr_Fetch(&ptype, &pvalue, &ptraceback);
-    if (ptype) {
-      if (pvalue) {
-        char *pStrErrorMessage = PyString_AsString(pvalue);
-
-XXX why is pvalue giving null when converted to string?
-
-        assert(pStrErrorMessage != nullptr);
-        derr << "Exception: " << pStrErrorMessage << dendl;
-        Py_DECREF(ptraceback);
-        Py_DECREF(pvalue);
-      }
-      Py_DECREF(ptype);
+  boost::tokenizer<> tok(g_conf->mgr_modules);
+  for(boost::tokenizer<>::iterator module_name=tok.begin();
+      module_name != tok.end();++module_name){
+    dout(1) << "Loading python module '" << *module_name << "'" << dendl;
+    auto mod = new MgrPyModule(*module_name);
+    int r = mod->load();
+    if (r != 0) {
+      derr << "Error loading module '" << *module_name << "': "
+        << cpp_strerror(r) << dendl;
+      derr << handle_pyerror() << dendl;
+
+      return r;
+    } else {
+      // Success!
+      modules[*module_name] = mod;
     }
+  } 
+
+  // Drop the GIL
+#if 1
+  PyThreadState *tstate = PyEval_SaveThread();
+#else
+  PyGILState_STATE gstate;
+  gstate = PyGILState_Ensure();
+  PyGILState_Release(gstate);
 #endif
+  
+  return 0;
+}
 
-    // FIXME: be tolerant of bad modules, log an error and continue
-    // to load other, healthy modules.
-    return r;
-  }
+class ServeThread : public Thread
+{
+  MgrPyModule *mod;
+
+public:
+  ServeThread(MgrPyModule *mod_)
+    : mod(mod_) {}
+
+  void *entry()
   {
-    Mutex::Locker locker(lock);
-    modules["rest"] = mod;
-  }
+    PyGILState_STATE gstate;
+    gstate = PyGILState_Ensure();
 
-  // Execute python server
-  mod->serve();
+    dout(4) << "Entering thread for " << mod->get_name() << dendl;
+    mod->serve();
+
+    PyGILState_Release(gstate);
+
+    return nullptr;
+  }
+};
 
+void PyModules::start()
+{
   {
-    Mutex::Locker locker(lock);
-    // Tear down modules
-    for (auto i : modules) {
-      delete i.second;
+    Mutex::Locker l(lock);
+    for (auto &i : modules) {
+      auto thread = new ServeThread(i.second);
+      serve_threads[i.first] = thread;
     }
-    modules.clear();
   }
 
+  for (auto &i : serve_threads) {
+    std::ostringstream thread_name;
+    thread_name << "mgr." << i.first;
+    dout(4) << "Starting thread for " << i.first << dendl;
+    i.second->create(thread_name.str().c_str());
+  }
+}
+
+void PyModules::shutdown()
+{
+  Mutex::Locker locker(lock);
+
+  // Signal modules to drop out of serve()
+  for (auto i : modules) {
+    auto module = i.second;
+    finisher.queue(new C_StdFunction([module](){
+      module->shutdown();
+    }));
+  }
+
+  for (auto &i : serve_threads) {
+    lock.Unlock();
+    i.second->join();
+    lock.Lock();
+    delete i.second;
+  }
+  serve_threads.clear();
+
+  // Tear down modules
+  for (auto i : modules) {
+    delete i.second;
+  }
+  modules.clear();
+
   Py_Finalize();
-  return 0;
 }
 
 void PyModules::notify_all(const std::string &notify_type,
@@ -320,6 +376,10 @@ void PyModules::notify_all(const std::string &notify_type,
 bool PyModules::get_config(const std::string &handle,
     const std::string &key, std::string *val) const
 {
+  PyThreadState *tstate = PyEval_SaveThread();
+  Mutex::Locker l(lock);
+  PyEval_RestoreThread(tstate);
+
   const std::string global_key = config_prefix + handle + "." + key;
 
   if (config_cache.count(global_key)) {
@@ -335,20 +395,25 @@ void PyModules::set_config(const std::string &handle,
 {
   const std::string global_key = config_prefix + handle + "." + key;
 
-  config_cache[global_key] = val;
-
-  std::ostringstream cmd_json;
   Command set_cmd;
-
-  JSONFormatter jf;
-  jf.open_object_section("cmd");
-  jf.dump_string("prefix", "config-key put");
-  jf.dump_string("key", global_key);
-  jf.dump_string("val", val);
-  jf.close_section();
-  jf.flush(cmd_json);
-
-  set_cmd.run(&monc, cmd_json.str());
+  {
+    PyThreadState *tstate = PyEval_SaveThread();
+    Mutex::Locker l(lock);
+    PyEval_RestoreThread(tstate);
+    config_cache[global_key] = val;
+
+    std::ostringstream cmd_json;
+
+    JSONFormatter jf;
+    jf.open_object_section("cmd");
+    jf.dump_string("prefix", "config-key put");
+    jf.dump_string("key", global_key);
+    jf.dump_string("val", val);
+    jf.close_section();
+    jf.flush(cmd_json);
+
+    set_cmd.run(&monc, cmd_json.str());
+  }
   set_cmd.wait();
 
   // FIXME: is config-key put ever allowed to fail?
@@ -374,6 +439,8 @@ std::vector<ModuleCommand> PyModules::get_commands()
 void PyModules::insert_config(const std::map<std::string,
                               std::string> &new_config)
 {
+  Mutex::Locker l(lock);
+
   dout(4) << "Loaded " << new_config.size() << " config settings" << dendl;
   config_cache = new_config;
 }
index 93e373409158dd287e1ff2624d624714ed155ed1..31c391888e3512341b44fa36496c79508e36b219 100644 (file)
 
 #include "common/Finisher.h"
 #include "common/Mutex.h"
-
+#include "common/Thread.h"
 
 #include "DaemonState.h"
 #include "ClusterState.h"
 
-
-
-
+class ServeThread;
 
 class PyModules
 {
   protected:
   std::map<std::string, MgrPyModule*> modules;
+  std::map<std::string, ServeThread*> serve_threads;
 
   DaemonStateIndex &daemon_state;
   ClusterState &cluster_state;
   MonClient &monc;
   Finisher &finisher;
 
-  Mutex lock;
+  mutable Mutex lock;
 
 public:
   static constexpr auto config_prefix = "mgr.";
@@ -65,15 +64,14 @@ public:
   void insert_config(const std::map<std::string, std::string> &new_config);
 
   // Public so that MonCommandCompletion can use it
-  // FIXME: bit weird that we're sending command completions
-  // to all modules (we rely on them to ignore anything that
-  // they don't recognise), but when we get called from
-  // python-land we don't actually know who we are.  Need
-  // to give python-land a handle in initialisation.
+  // FIXME: for send_command completion notifications,
+  // send it to only the module that sent the command, not everyone
   void notify_all(const std::string &notify_type,
                   const std::string &notify_id);
 
-  int main(std::vector<const char *> args);
+  int init();
+  void start();
+  void shutdown();
 
   void dump_server(const std::string &hostname,
                    const DaemonStateCollection &dmc,