]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: refactor python module management
authorJohn Spray <john.spray@redhat.com>
Mon, 14 Aug 2017 10:31:18 +0000 (06:31 -0400)
committerJohn Spray <john.spray@redhat.com>
Wed, 1 Nov 2017 12:20:20 +0000 (08:20 -0400)
Separate out the *loading* of modules from
the *running* of modules.

This is a precursor to enabling modules to run
in standby mode.

Signed-off-by: John Spray <john.spray@redhat.com>
15 files changed:
src/CMakeLists.txt
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/Mgr.cc
src/mgr/Mgr.h
src/mgr/MgrPyModule.cc
src/mgr/MgrPyModule.h
src/mgr/MgrStandby.cc
src/mgr/MgrStandby.h
src/mgr/PyModuleRegistry.cc [new file with mode: 0644]
src/mgr/PyModuleRegistry.h [new file with mode: 0644]
src/mgr/PyModules.cc
src/mgr/PyModules.h
src/mgr/PyState.cc
src/mgr/PyState.h

index 7c09980b6a493d0cf72ba0a3f87450d0ba069f2a..663250134c4119d39bfb7997fe2e5ef1b7ed10fb 100644 (file)
@@ -713,6 +713,7 @@ if (WITH_MGR)
       mgr/DaemonServer.cc
       mgr/ClusterState.cc
       mgr/PyModules.cc
+      mgr/PyModuleRegistry.cc
       mgr/PyFormatter.cc
       mgr/PyOSDMap.cc
       mgr/PyState.cc
index bdb3cc96a7c1d80f4dc50c63f83ab7aef27c100b..42132d726022851bc1233cf4fe5f16d7689651a7 100644 (file)
@@ -41,7 +41,7 @@ DaemonServer::DaemonServer(MonClient *monc_,
                            Finisher &finisher_,
                           DaemonStateIndex &daemon_state_,
                           ClusterState &cluster_state_,
-                          PyModules &py_modules_,
+                          PyModuleRegistry &py_modules_,
                           LogChannelRef clog_,
                           LogChannelRef audit_clog_)
     : Dispatcher(g_ceph_context),
@@ -1287,7 +1287,7 @@ bool DaemonServer::handle_command(MCommand *m)
   }
 
   // None of the special native commands, 
-  MgrPyModule *handler = nullptr;
+  ActivePyModule *handler = nullptr;
   auto py_commands = py_modules.get_py_commands();
   for (const auto &pyc : py_commands) {
     auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
index a3ac2cad6ddeee238067981ffdac645d0e62774c..fe809833cbd800ce51023672cb7adc0013778e55 100644 (file)
@@ -14,7 +14,7 @@
 #ifndef DAEMON_SERVER_H_
 #define DAEMON_SERVER_H_
 
-#include "PyModules.h"
+#include "PyModuleRegistry.h"
 
 #include <set>
 #include <string>
@@ -59,7 +59,7 @@ protected:
   Finisher  &finisher;
   DaemonStateIndex &daemon_state;
   ClusterState &cluster_state;
-  PyModules &py_modules;
+  PyModuleRegistry &py_modules;
   LogChannelRef clog, audit_clog;
 
   AuthAuthorizeHandlerRegistry auth_registry;
@@ -109,7 +109,7 @@ public:
                Finisher &finisher_,
               DaemonStateIndex &daemon_state_,
               ClusterState &cluster_state_,
-              PyModules &py_modules_,
+              PyModuleRegistry &py_modules_,
               LogChannelRef cl,
               LogChannelRef auditcl);
   ~DaemonServer() override;
index 782710c9ae2e90a6db7b289c68248f35ad70a43e..5416e0d12352750f9ce879bf34edfc812c78d1cc 100644 (file)
@@ -42,6 +42,7 @@
 
 
 Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
+         PyModuleRegistry *py_module_registry_,
         Messenger *clientm_, Objecter *objecter_,
         Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
   monc(monc_),
@@ -52,11 +53,12 @@ Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
   timer(g_ceph_context, lock),
   finisher(g_ceph_context, "Mgr", "mgr-fin"),
   digest_received(false),
-  py_modules(daemon_state, cluster_state, *monc, clog_, *objecter, *client,
-             finisher),
+  py_module_registry(py_module_registry_),
   cluster_state(monc, nullptr, mgrmap),
-  server(monc, finisher, daemon_state, cluster_state, py_modules,
+  server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
          clog_, audit_clog_),
+  clog(clog_),
+  audit_clog(audit_clog_),
   initialized(false),
   initializing(false)
 {
@@ -232,7 +234,7 @@ void Mgr::init()
   // Preload config keys (`get` for plugins is to be a fast local
   // operation, we we don't have to synchronize these later because
   // all sets will come via mgr)
-  load_config();
+  auto loaded_config = load_config();
 
   // Wait for MgrDigest...
   dout(4) << "waiting for MgrDigest..." << dendl;
@@ -241,9 +243,9 @@ void Mgr::init()
   }
 
   // assume finisher already initialized in background_init
-  dout(4) << "starting PyModules..." << dendl;
-  py_modules.init();
-  py_modules.start();
+  dout(4) << "starting python modules..." << dendl;
+  py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
+      clog, *objecter, *client, finisher);
 
   dout(4) << "Complete." << dendl;
   initializing = false;
@@ -339,7 +341,7 @@ void Mgr::load_all_metadata()
   }
 }
 
-void Mgr::load_config()
+std::map<std::string, std::string> Mgr::load_config()
 {
   assert(lock.is_locked_by_me());
 
@@ -357,7 +359,7 @@ void Mgr::load_config()
     std::string const key = key_str.get_str();
     dout(20) << "saw key '" << key << "'" << dendl;
 
-    const std::string config_prefix = PyModules::config_prefix;
+    const std::string config_prefix = PyModuleRegistry::config_prefix;
 
     if (key.substr(0, config_prefix.size()) == config_prefix) {
       dout(20) << "fetching '" << key << "'" << dendl;
@@ -373,7 +375,7 @@ void Mgr::load_config()
     }
   }
 
-  py_modules.insert_config(loaded);
+  return loaded;
 }
 
 void Mgr::shutdown()
@@ -389,7 +391,7 @@ void Mgr::shutdown()
       server.shutdown();
     }
     // after the messenger is stopped, signal modules to shutdown via finisher
-    py_modules.shutdown();
+    py_module_registry->shutdown();
   }));
 
   // Then stop the finisher to ensure its enqueued contexts aren't going
@@ -472,7 +474,7 @@ void Mgr::handle_osd_map()
 void Mgr::handle_log(MLog *m)
 {
   for (const auto &e : m->entries) {
-    py_modules.notify_all(e);
+    py_module_registry->notify_all(e);
   }
 
   m->put();
@@ -495,18 +497,18 @@ bool Mgr::ms_dispatch(Message *m)
       handle_mgr_digest(static_cast<MMgrDigest*>(m));
       break;
     case CEPH_MSG_MON_MAP:
-      py_modules.notify_all("mon_map", "");
+      py_module_registry->notify_all("mon_map", "");
       m->put();
       break;
     case CEPH_MSG_FS_MAP:
-      py_modules.notify_all("fs_map", "");
+      py_module_registry->notify_all("fs_map", "");
       handle_fs_map((MFSMap*)m);
       return false; // I shall let this pass through for Client
       break;
     case CEPH_MSG_OSD_MAP:
       handle_osd_map();
 
-      py_modules.notify_all("osd_map", "");
+      py_module_registry->notify_all("osd_map", "");
 
       // Continuous subscribe, so that we can generate notifications
       // for our MgrPyModules
@@ -515,7 +517,7 @@ bool Mgr::ms_dispatch(Message *m)
       break;
     case MSG_SERVICE_MAP:
       handle_service_map((MServiceMap*)m);
-      py_modules.notify_all("service_map", "");
+      py_module_registry->notify_all("service_map", "");
       m->put();
       break;
     case MSG_LOG:
@@ -626,12 +628,12 @@ void Mgr::handle_mgr_digest(MMgrDigest* m)
   dout(10) << m->mon_status_json.length() << dendl;
   dout(10) << m->health_json.length() << dendl;
   cluster_state.load_digest(m);
-  py_modules.notify_all("mon_status", "");
-  py_modules.notify_all("health", "");
+  py_module_registry->notify_all("mon_status", "");
+  py_module_registry->notify_all("health", "");
 
   // Hack: use this as a tick/opportunity to prompt python-land that
   // the pgmap might have changed since last time we were here.
-  py_modules.notify_all("pg_summary", "");
+  py_module_registry->notify_all("pg_summary", "");
   dout(10) << "done." << dendl;
 
   m->put();
@@ -653,7 +655,7 @@ std::vector<MonCommand> Mgr::get_command_set() const
   Mutex::Locker l(lock);
 
   std::vector<MonCommand> commands = mgr_commands;
-  std::vector<MonCommand> py_commands = py_modules.get_commands();
+  std::vector<MonCommand> py_commands = py_module_registry->get_commands();
   commands.insert(commands.end(), py_commands.begin(), py_commands.end());
   return commands;
 }
@@ -662,6 +664,6 @@ std::map<std::string, std::string> Mgr::get_services() const
 {
   Mutex::Locker l(lock);
 
-  return py_modules.get_services();
+  return py_module_registry->get_services();
 }
 
index e37b1b9bee6f6020d583e5566aa7aad334539ea7..9a6b3974b2915ad20930c7688f05064618129eee 100644 (file)
@@ -32,7 +32,7 @@
 #include "mon/MgrMap.h"
 
 #include "DaemonServer.h"
-#include "PyModules.h"
+#include "PyModuleRegistry.h"
 
 #include "DaemonState.h"
 #include "ClusterState.h"
@@ -44,8 +44,6 @@ class MServiceMap;
 class Objecter;
 class Client;
 
-class MgrPyModule;
-
 class Mgr {
 protected:
   MonClient *monc;
@@ -62,13 +60,16 @@ protected:
   bool digest_received;
   Cond digest_cond;
 
-  PyModules py_modules;
+  PyModuleRegistry *py_module_registry;
   DaemonStateIndex daemon_state;
   ClusterState cluster_state;
 
   DaemonServer server;
 
-  void load_config();
+  LogChannelRef clog;
+  LogChannelRef audit_clog;
+
+  PyModuleConfig load_config();
   void load_all_metadata();
   void init();
 
@@ -77,6 +78,7 @@ protected:
 
 public:
   Mgr(MonClient *monc_, const MgrMap& mgrmap,
+      PyModuleRegistry *py_module_registry_,
       Messenger *clientm_, Objecter *objecter_,
       Client *client_, LogChannelRef clog_, LogChannelRef audit_clog_);
   ~Mgr();
index e3e3160de130ee84fbecfee2a384d319459802d2..268b32e3bd8b32232097ddb3aa92a112ac1688f1 100644 (file)
@@ -24,6 +24,9 @@
 #include <boost/python.hpp>
 #include "include/assert.h"  // boost clobbers this
 
+
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
 // decode a Python exception into a string
 std::string handle_pyerror()
 {
@@ -46,96 +49,28 @@ std::string handle_pyerror()
     return extract<std::string>(formatted);
 }
 
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_mgr
-
-#undef dout_prefix
-#define dout_prefix *_dout << "mgr[py] "
-
-namespace {
-  PyObject* log_write(PyObject*, PyObject* args) {
-    char* m = nullptr;
-    if (PyArg_ParseTuple(args, "s", &m)) {
-      auto len = strlen(m);
-      if (len && m[len-1] == '\n') {
-       m[len-1] = '\0';
-      }
-      dout(4) << m << dendl;
-    }
-    Py_RETURN_NONE;
-  }
 
-  PyObject* log_flush(PyObject*, PyObject*){
-    Py_RETURN_NONE;
-  }
 
-  static PyMethodDef log_methods[] = {
-    {"write", log_write, METH_VARARGS, "write stdout and stderr"},
-    {"flush", log_flush, METH_VARARGS, "flush"},
-    {nullptr, nullptr, 0, nullptr}
-  };
-}
 
-#undef dout_prefix
-#define dout_prefix *_dout << "mgr " << __func__ << " "
+void *ServeThread::entry()
+{
+  // No need to acquire the GIL here; the module does it.
+  dout(4) << "Entering thread for " << mod->get_name() << dendl;
+  mod->serve();
+  return nullptr;
+}
 
-MgrPyModule::MgrPyModule(const std::string &module_name_, const std::string &sys_path, PyThreadState *main_ts_)
+ActivePyModule::ActivePyModule(const std::string &module_name_,
+                               PyObject *pClass_,
+                               PyThreadState *my_ts_)
   : module_name(module_name_),
-    pClassInstance(nullptr),
-    pMainThreadState(main_ts_)
+    pClass(pClass_),
+    pMyThreadState(my_ts_),
+    thread(this)
 {
-  Gil gil(pMainThreadState);
-
-  auto thread_state = Py_NewInterpreter();
-  if (thread_state == nullptr) {
-    derr << "Failed to create python sub-interpreter for '" << module_name << '"' << dendl;
-  } else {
-    pMyThreadState.set(thread_state);
-
-    // Some python modules do not cope with an unpopulated argv, so lets
-    // fake one.  This step also picks up site-packages into sys.path.
-    const char *argv[] = {"ceph-mgr"};
-    PySys_SetArgv(1, (char**)argv);
-
-    if (g_conf->daemonize) {
-      auto py_logger = Py_InitModule("ceph_logger", log_methods);
-#if PY_MAJOR_VERSION >= 3
-      PySys_SetObject("stderr", py_logger);
-      PySys_SetObject("stdout", py_logger);
-#else
-      PySys_SetObject(const_cast<char*>("stderr"), py_logger);
-      PySys_SetObject(const_cast<char*>("stdout"), py_logger);
-#endif
-    }
-
-    PySys_SetPath(const_cast<char*>(sys_path.c_str()));
-
-    // Populate python namespace with callable hooks
-    Py_InitModule("ceph_osdmap", OSDMapMethods);
-    Py_InitModule("ceph_osdmap_incremental", OSDMapIncrementalMethods);
-    Py_InitModule("ceph_crushmap", CRUSHMapMethods);
-
-    PyMethodDef ModuleMethods[] = {
-      {nullptr}
-    };
-
-    // Initialize module
-    PyObject *ceph_module = Py_InitModule("ceph_module", ModuleMethods);
-    assert(ceph_module != nullptr);
-
-    // Initialize base class
-    BaseMgrModuleType.tp_new = PyType_GenericNew;
-    if (PyType_Ready(&BaseMgrModuleType) < 0) {
-        assert(0);
-    }
-
-    Py_INCREF(&BaseMgrModuleType);
-    PyModule_AddObject(ceph_module, "BaseMgrModule",
-                       (PyObject *)&BaseMgrModuleType);
-  }
 }
 
-MgrPyModule::~MgrPyModule()
+ActivePyModule::~ActivePyModule()
 {
   if (pMyThreadState.ts != nullptr) {
     Gil gil(pMyThreadState);
@@ -168,45 +103,17 @@ MgrPyModule::~MgrPyModule()
   }
 }
 
-int MgrPyModule::load(PyModules *py_modules)
+int ActivePyModule::load(ActivePyModules *py_modules)
 {
-  if (pMyThreadState.ts == nullptr) {
-    derr << "No python sub-interpreter exists for module '" << module_name << "'" << dendl;
-    return -EINVAL;
-  }
-
   Gil gil(pMyThreadState);
 
-  // Load the module
-  PyObject *pName = PyString_FromString(module_name.c_str());
-  auto pModule = PyImport_Import(pName);
-  Py_DECREF(pName);
-  if (pModule == nullptr) {
-    derr << "Module not found: '" << module_name << "'" << dendl;
-    derr << handle_pyerror() << dendl;
-
-    assert(0);
-    return -ENOENT;
-  }
-
-  // Find the class
-  // TODO: let them call it what they want instead of just 'Module'
-  auto pClass = PyObject_GetAttrString(pModule, (const char*)"Module");
-  Py_DECREF(pModule);
-  if (pClass == nullptr) {
-    derr << "Class not found in module '" << module_name << "'" << dendl;
-    derr << handle_pyerror() << dendl;
-    return -EINVAL;
-  }
-  
   // We tell the module how we name it, so that it can be consistent
   // with us in logging etc.
-  
   auto pThisPtr = PyCapsule_New(this, nullptr, nullptr);
   auto pPyModules = PyCapsule_New(py_modules, nullptr, nullptr);
-  
   auto pModuleName = PyString_FromString(module_name.c_str());
   auto pArgs = PyTuple_Pack(3, pModuleName, pPyModules, pThisPtr);
+
   pClassInstance = PyObject_CallObject(pClass, pArgs);
   Py_DECREF(pClass);
   Py_DECREF(pModuleName);
@@ -222,7 +129,7 @@ int MgrPyModule::load(PyModules *py_modules)
   return load_commands();
 }
 
-int MgrPyModule::serve()
+int ActivePyModule::serve()
 {
   assert(pClassInstance != nullptr);
 
@@ -245,8 +152,7 @@ int MgrPyModule::serve()
   return r;
 }
 
-// FIXME: DRY wrt serve
-void MgrPyModule::shutdown()
+void ActivePyModule::shutdown()
 {
   assert(pClassInstance != nullptr);
 
@@ -263,7 +169,7 @@ void MgrPyModule::shutdown()
   }
 }
 
-void MgrPyModule::notify(const std::string &notify_type, const std::string &notify_id)
+void ActivePyModule::notify(const std::string &notify_type, const std::string &notify_id)
 {
   assert(pClassInstance != nullptr);
 
@@ -286,7 +192,7 @@ void MgrPyModule::notify(const std::string &notify_type, const std::string &noti
   }
 }
 
-void MgrPyModule::notify_clog(const LogEntry &log_entry)
+void ActivePyModule::notify_clog(const LogEntry &log_entry)
 {
   assert(pClassInstance != nullptr);
 
@@ -314,9 +220,9 @@ void MgrPyModule::notify_clog(const LogEntry &log_entry)
   }
 }
 
-int MgrPyModule::load_commands()
+int ActivePyModule::load_commands()
 {
-  // Don't need a Gil here -- this is called from MgrPyModule::load(),
+  // Don't need a Gil here -- this is called from ActivePyModule::load(),
   // which already has one.
   PyObject *command_list = PyObject_GetAttrString(pClassInstance, "COMMANDS");
   assert(command_list != nullptr);
@@ -352,7 +258,7 @@ int MgrPyModule::load_commands()
   return 0;
 }
 
-int MgrPyModule::handle_command(
+int ActivePyModule::handle_command(
   const cmdmap_t &cmdmap,
   std::stringstream *ds,
   std::stringstream *ss)
@@ -391,7 +297,7 @@ int MgrPyModule::handle_command(
   return r;
 }
 
-void MgrPyModule::get_health_checks(health_check_map_t *checks)
+void ActivePyModule::get_health_checks(health_check_map_t *checks)
 {
   checks->merge(health_checks);
 }
index bb1dc5b62146762d0da888736cc1a63f306eddd8..cab20a762569586167a174ac896da7c7322c9ccf 100644 (file)
@@ -22,6 +22,7 @@
 #include "common/cmdparse.h"
 #include "common/LogEntry.h"
 #include "common/Mutex.h"
+#include "common/Thread.h"
 #include "mon/health_check.h"
 #include "mgr/Gil.h"
 
@@ -29,8 +30,8 @@
 #include <string>
 
 
-class MgrPyModule;
-class PyModules;
+class ActivePyModule;
+class ActivePyModules;
 
 /**
  * A Ceph CLI command description provided from a Python module
@@ -40,16 +41,34 @@ public:
   std::string cmdstring;
   std::string helpstring;
   std::string perm;
-  MgrPyModule *handler;
+  ActivePyModule *handler;
 };
 
-class MgrPyModule
+class ServeThread : public Thread
+{
+  ActivePyModule *mod;
+
+public:
+  ServeThread(ActivePyModule *mod_)
+    : mod(mod_) {}
+
+  void *entry() override;
+};
+
+class ActivePyModule
 {
 private:
   const std::string module_name;
-  PyObject *pClassInstance;
-  SafeThreadState pMainThreadState;
-  SafeThreadState pMyThreadState;
+
+  // Passed in by whoever loaded our python module and looked up
+  // the symbols in it.
+  PyObject *pClass = nullptr;
+
+  // Passed in by whoever created our subinterpreter for us
+  SafeThreadState pMyThreadState = nullptr;
+
+  // Populated when we construct our instance of pClass in load()
+  PyObject *pClassInstance = nullptr;
 
   health_check_map_t health_checks;
 
@@ -61,10 +80,14 @@ private:
   std::string uri;
 
 public:
-  MgrPyModule(const std::string &module_name, const std::string &sys_path, PyThreadState *main_ts);
-  ~MgrPyModule();
+  ActivePyModule(const std::string &module_name,
+      PyObject *pClass_,
+      PyThreadState *my_ts);
+  ~ActivePyModule();
+
+  ServeThread thread;
 
-  int load(PyModules *py_modules);
+  int load(ActivePyModules *py_modules);
   int serve();
   void shutdown();
   void notify(const std::string &notify_type, const std::string &notify_id);
index 3fb663a7db92cb7fd0487518469fbd29db486cf0..323d3a58458172bb799b567e20bef08d42cfe330 100644 (file)
@@ -46,6 +46,7 @@ MgrStandby::MgrStandby(int argc, const char **argv) :
   audit_clog(log_client.create_channel(CLOG_CHANNEL_AUDIT)),
   lock("MgrStandby::lock"),
   timer(g_ceph_context, lock),
+  py_module_registry(clog),
   active_mgr(nullptr),
   orig_argc(argc),
   orig_argv(argv),
@@ -151,7 +152,7 @@ void MgrStandby::send_beacon()
   dout(1) << state_str() << dendl;
 
   set<string> modules;
-  PyModules::list_modules(&modules);
+  PyModuleRegistry::list_modules(&modules);
 
   // Whether I think I am available (request MgrMonitor to set me
   // as available in the map)
@@ -217,6 +218,8 @@ void MgrStandby::shutdown()
   // Expect already to be locked as we're called from signal handler
   assert(lock.is_locked_by_me());
 
+  py_module_registry.shutdown();
+
   // stop sending beacon first, i use monc to talk with monitors
   timer.shutdown();
   // client uses monc and objecter
@@ -306,10 +309,24 @@ void MgrStandby::handle_mgr_map(MMgrMap* mmap)
   const bool active_in_map = map.active_gid == monc.get_global_id();
   dout(4) << "active in map: " << active_in_map
           << " active is " << map.active_gid << dendl;
+
+  if (!py_module_registry.is_initialized()) {
+    int r = py_module_registry.init(map);
+
+    // FIXME: error handling
+    assert(r == 0);
+  } else {
+    bool need_respawn = py_module_registry.handle_mgr_map(map);
+    if (need_respawn) {
+      respawn();
+    }
+  }
+
   if (active_in_map) {
     if (!active_mgr) {
       dout(1) << "Activating!" << dendl;
-      active_mgr.reset(new Mgr(&monc, map, client_messenger.get(), &objecter,
+      active_mgr.reset(new Mgr(&monc, map, &py_module_registry,
+                               client_messenger.get(), &objecter,
                               &client, clog, audit_clog));
       active_mgr->background_init(new FunctionContext(
             [this](int r){
index e24f175cada4090845e42157e26d7313f88fa304..a64fd7e99958c4f164dd9fe3aee6e8d6fc30fb46 100644 (file)
@@ -23,6 +23,7 @@
 #include "client/Client.h"
 #include "mon/MonClient.h"
 #include "osdc/Objecter.h"
+#include "PyModuleRegistry.h"
 
 
 class MMgrMap;
@@ -48,6 +49,7 @@ protected:
   Mutex lock;
   SafeTimer timer;
 
+  PyModuleRegistry py_module_registry;
   std::shared_ptr<Mgr> active_mgr;
 
   int orig_argc;
diff --git a/src/mgr/PyModuleRegistry.cc b/src/mgr/PyModuleRegistry.cc
new file mode 100644 (file)
index 0000000..42c3fdf
--- /dev/null
@@ -0,0 +1,338 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#include "include/stringify.h"
+#include "common/errno.h"
+
+#include "PyState.h"
+#include "Gil.h"
+
+#include "PyModuleRegistry.h"
+
+// definition for non-const static member
+std::string PyModuleRegistry::config_prefix;
+
+
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr[py] "
+
+namespace {
+  PyObject* log_write(PyObject*, PyObject* args) {
+    char* m = nullptr;
+    if (PyArg_ParseTuple(args, "s", &m)) {
+      auto len = strlen(m);
+      if (len && m[len-1] == '\n') {
+       m[len-1] = '\0';
+      }
+      dout(4) << m << dendl;
+    }
+    Py_RETURN_NONE;
+  }
+
+  PyObject* log_flush(PyObject*, PyObject*){
+    Py_RETURN_NONE;
+  }
+
+  static PyMethodDef log_methods[] = {
+    {"write", log_write, METH_VARARGS, "write stdout and stderr"},
+    {"flush", log_flush, METH_VARARGS, "flush"},
+    {nullptr, nullptr, 0, nullptr}
+  };
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+
+
+std::string PyModule::get_site_packages()
+{
+  std::stringstream site_packages;
+
+  // CPython doesn't auto-add site-packages dirs to sys.path for us,
+  // but it does provide a module that we can ask for them.
+  auto site_module = PyImport_ImportModule("site");
+  assert(site_module);
+
+  auto site_packages_fn = PyObject_GetAttrString(site_module, "getsitepackages");
+  if (site_packages_fn != nullptr) {
+    auto site_packages_list = PyObject_CallObject(site_packages_fn, nullptr);
+    assert(site_packages_list);
+
+    auto n = PyList_Size(site_packages_list);
+    for (Py_ssize_t i = 0; i < n; ++i) {
+      if (i != 0) {
+        site_packages << ":";
+      }
+      site_packages << PyString_AsString(PyList_GetItem(site_packages_list, i));
+    }
+
+    Py_DECREF(site_packages_list);
+    Py_DECREF(site_packages_fn);
+  } else {
+    // Fall back to generating our own site-packages paths by imitating
+    // what the standard site.py does.  This is annoying but it lets us
+    // run inside virtualenvs :-/
+
+    auto site_packages_fn = PyObject_GetAttrString(site_module, "addsitepackages");
+    assert(site_packages_fn);
+
+    auto known_paths = PySet_New(nullptr);
+    auto pArgs = PyTuple_Pack(1, known_paths);
+    PyObject_CallObject(site_packages_fn, pArgs);
+    Py_DECREF(pArgs);
+    Py_DECREF(known_paths);
+    Py_DECREF(site_packages_fn);
+
+    auto sys_module = PyImport_ImportModule("sys");
+    assert(sys_module);
+    auto sys_path = PyObject_GetAttrString(sys_module, "path");
+    assert(sys_path);
+
+    dout(1) << "sys.path:" << dendl;
+    auto n = PyList_Size(sys_path);
+    bool first = true;
+    for (Py_ssize_t i = 0; i < n; ++i) {
+      dout(1) << "  " << PyString_AsString(PyList_GetItem(sys_path, i)) << dendl;
+      if (first) {
+        first = false;
+      } else {
+        site_packages << ":";
+      }
+      site_packages << PyString_AsString(PyList_GetItem(sys_path, i));
+    }
+
+    Py_DECREF(sys_path);
+    Py_DECREF(sys_module);
+  }
+
+  Py_DECREF(site_module);
+
+  return site_packages.str();
+}
+
+int PyModuleRegistry::init(const MgrMap &map)
+{
+  Mutex::Locker locker(lock);
+
+  // Don't try and init me if you don't really have a map
+  assert(map.epoch > 0);
+
+  mgr_map = map;
+
+  // namespace in config-key prefixed by "mgr/"
+  config_prefix = std::string(g_conf->name.get_type_str()) + "/";
+
+  // Set up global python interpreter
+  Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE));
+  Py_InitializeEx(0);
+
+  // Let CPython know that we will be calling it back from other
+  // threads in future.
+  if (! PyEval_ThreadsInitialized()) {
+    PyEval_InitThreads();
+  }
+
+  // Drop the GIL and remember the main thread state (current
+  // thread state becomes NULL)
+  pMainThreadState = PyEval_SaveThread();
+
+  std::list<std::string> failed_modules;
+
+  // Load python code
+  for (const auto& module_name : mgr_map.modules) {
+    dout(1) << "Loading python module '" << module_name << "'" << dendl;
+    auto mod = ceph::make_unique<PyModule>(module_name);
+    int r = mod->load(pMainThreadState);
+    if (r != 0) {
+      // Don't use handle_pyerror() here; we don't have the GIL
+      // or the right thread state (this is deliberate).
+      derr << "Error loading module '" << module_name << "': "
+        << cpp_strerror(r) << dendl;
+      failed_modules.push_back(module_name);
+      // Don't drop out here, load the other modules
+    } else {
+      // Success!
+      modules[module_name] = std::move(mod);
+    }
+  }
+
+  if (!failed_modules.empty()) {
+    clog->error() << "Failed to load ceph-mgr modules: " << joinify(
+        failed_modules.begin(), failed_modules.end(), std::string(", "));
+  }
+
+  return 0;
+}
+
+
+int PyModule::load(PyThreadState *pMainThreadState)
+{
+  assert(pMainThreadState != nullptr);
+
+  // Configure sub-interpreter and construct C++-generated python classes
+  {
+    Gil gil(pMainThreadState);
+
+    pMyThreadState = Py_NewInterpreter();
+
+    if (pMyThreadState == nullptr) {
+      derr << "Failed to create python sub-interpreter for '" << module_name << '"' << dendl;
+      return -EINVAL;
+    } else {
+      // Some python modules do not cope with an unpopulated argv, so lets
+      // fake one.  This step also picks up site-packages into sys.path.
+      const char *argv[] = {"ceph-mgr"};
+      PySys_SetArgv(1, (char**)argv);
+
+      if (g_conf->daemonize) {
+        auto py_logger = Py_InitModule("ceph_logger", log_methods);
+#if PY_MAJOR_VERSION >= 3
+        PySys_SetObject("stderr", py_logger);
+        PySys_SetObject("stdout", py_logger);
+#else
+        PySys_SetObject(const_cast<char*>("stderr"), py_logger);
+        PySys_SetObject(const_cast<char*>("stdout"), py_logger);
+#endif
+      }
+
+      // Configure sys.path to include mgr_module_path
+      std::string sys_path = std::string(Py_GetPath()) + ":" + get_site_packages()
+                             + ":" + g_conf->get_val<std::string>("mgr_module_path");
+      dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
+
+      PySys_SetPath(const_cast<char*>(sys_path.c_str()));
+    }
+
+    PyMethodDef ModuleMethods[] = {
+      {nullptr}
+    };
+
+    // Initialize module
+    PyObject *ceph_module = Py_InitModule("ceph_module", ModuleMethods);
+    assert(ceph_module != nullptr);
+
+    Py_InitModule("ceph_osdmap", OSDMapMethods);
+    Py_InitModule("ceph_osdmap_incremental", OSDMapIncrementalMethods);
+    Py_InitModule("ceph_crushmap", CRUSHMapMethods);
+
+    // Initialize base class
+    BaseMgrModuleType.tp_new = PyType_GenericNew;
+    if (PyType_Ready(&BaseMgrModuleType) < 0) {
+        assert(0);
+    }
+
+    Py_INCREF(&BaseMgrModuleType);
+    PyModule_AddObject(ceph_module, "BaseMgrModule",
+                       (PyObject *)&BaseMgrModuleType);
+  }
+
+  // Environment is all good, import the external module
+  {
+    Gil gil(pMyThreadState);
+
+    // Load the module
+    PyObject *pName = PyString_FromString(module_name.c_str());
+    auto pModule = PyImport_Import(pName);
+    Py_DECREF(pName);
+    if (pModule == nullptr) {
+      derr << "Module not found: '" << module_name << "'" << dendl;
+      derr << handle_pyerror() << dendl;
+
+      assert(0);
+      return -ENOENT;
+    }
+
+    // Find the class
+    // TODO: let them call it what they want instead of just 'Module'
+    pClass = PyObject_GetAttrString(pModule, (const char*)"Module");
+    Py_DECREF(pModule);
+    if (pClass == nullptr) {
+      derr << "Class not found in module '" << module_name << "'" << dendl;
+      derr << handle_pyerror() << dendl;
+      return -EINVAL;
+    }
+  }
+
+  return 0;
+} 
+
+void PyModuleRegistry::active_start(
+            PyModuleConfig &config_,
+            DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
+            LogChannelRef clog_, Objecter &objecter_, Client &client_,
+            Finisher &f)
+{
+  Mutex::Locker locker(lock);
+
+  assert(active_modules == nullptr);
+
+  active_modules.reset(new ActivePyModules(
+              config_, ds, cs, mc, clog_, objecter_, client_, f));
+
+  for (const auto &i : modules) {
+    active_modules->start_one(i.first,
+            i.second->pClass,
+            i.second->pMyThreadState);
+  }
+}
+
+void PyModuleRegistry::shutdown()
+{
+  Mutex::Locker locker(lock);
+  if (active_modules != nullptr) {
+    active_modules->shutdown();
+    active_modules.reset();
+  }
+
+  modules.clear();
+
+  PyEval_RestoreThread(pMainThreadState);
+  Py_Finalize();
+}
+
+static void _list_modules(
+  const std::string path,
+  std::set<std::string> *modules)
+{
+  DIR *dir = opendir(path.c_str());
+  if (!dir) {
+    return;
+  }
+  struct dirent *entry = NULL;
+  while ((entry = readdir(dir)) != NULL) {
+    string n(entry->d_name);
+    string fn = path + "/" + n;
+    struct stat st;
+    int r = ::stat(fn.c_str(), &st);
+    if (r == 0 && S_ISDIR(st.st_mode)) {
+      string initfn = fn + "/module.py";
+      r = ::stat(initfn.c_str(), &st);
+      if (r == 0) {
+       modules->insert(n);
+      }
+    }
+  }
+  closedir(dir);
+}
+
+void PyModuleRegistry::list_modules(std::set<std::string> *modules)
+{
+  _list_modules(g_conf->get_val<std::string>("mgr_module_path"), modules);
+}
+
diff --git a/src/mgr/PyModuleRegistry.h b/src/mgr/PyModuleRegistry.h
new file mode 100644 (file)
index 0000000..0204ee3
--- /dev/null
@@ -0,0 +1,146 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#pragma once
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+
+// ActivePyModules
+#include "PyModules.h"
+
+class PyModule
+{
+private:
+  const std::string module_name;
+  std::string get_site_packages();
+
+public:
+  PyThreadState *pMyThreadState = nullptr;
+  PyObject *pClass = nullptr;
+
+  PyModule(const std::string &module_name_)
+    : module_name(module_name_)
+  {
+  }
+  int load(PyThreadState *pMainThreadState);
+};
+
+/**
+ * This class is responsible for setting up the python runtime environment
+ * and importing the python modules.
+ *
+ * It is *not* responsible for constructing instances of their BaseMgrModule
+ * subclasses.
+ */
+class PyModuleRegistry
+{
+private:
+  LogChannelRef clog;
+
+  std::map<std::string, std::unique_ptr<PyModule>> modules;
+
+  std::unique_ptr<ActivePyModules> active_modules;
+
+  mutable Mutex lock{"PyModuleRegistry::lock"};
+
+  PyThreadState *pMainThreadState = nullptr;
+
+  // We have our own copy of MgrMap, because we are constructed
+  // before ClusterState exists.
+  MgrMap mgr_map;
+
+public:
+  static std::string config_prefix;
+
+  static void list_modules(std::set<std::string> *modules);
+
+  PyModuleRegistry(LogChannelRef clog_)
+    : clog(clog_)
+  {}
+
+  bool handle_mgr_map(const MgrMap &mgr_map_)
+  {
+    Mutex::Locker l(lock);
+
+    bool modules_changed = mgr_map_.modules != mgr_map.modules;
+    mgr_map = mgr_map_;
+    return modules_changed;
+  }
+
+  bool is_initialized() const
+  {
+    return mgr_map.epoch > 0;
+  }
+
+  int init(const MgrMap &map);
+
+  void active_start(
+                PyModuleConfig &config_,
+                DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
+                LogChannelRef clog_, Objecter &objecter_, Client &client_,
+                Finisher &f);
+  void standby_start();
+
+  void shutdown();
+
+  template<typename Callback, typename...Args>
+  void with_active_modules(Callback&& cb, Args&&...args) const
+  {
+    Mutex::Locker l(lock);
+    assert(active_modules != nullptr);
+
+    std::forward<Callback>(cb)(*active_modules, std::forward<Args>(args)...);
+  }
+
+  // FIXME: breaking interface so that I don't have to go rewrite all
+  // the places that call into these (for now)
+  // >>>
+  void notify_all(const std::string &notify_type,
+                  const std::string &notify_id)
+  {
+    if (active_modules) {
+      active_modules->notify_all(notify_type, notify_id);
+    }
+  }
+
+  void notify_all(const LogEntry &log_entry)
+  {
+    if (active_modules) {
+      active_modules->notify_all(log_entry);
+    }
+  }
+
+  std::vector<MonCommand> get_commands() const
+  {
+    assert(active_modules);
+    return active_modules->get_commands();
+  }
+  std::vector<ModuleCommand> get_py_commands() const
+  {
+    assert(active_modules);
+    return active_modules->get_py_commands();
+  }
+  void get_health_checks(health_check_map_t *checks)
+  {
+    assert(active_modules);
+    active_modules->get_health_checks(checks);
+  }
+  std::map<std::string, std::string> get_services() const
+  {
+    assert(active_modules);
+    return active_modules->get_services();
+  }
+  // <<< (end of ActivePyModules cheeky call-throughs)
+};
index 401ec504a10a1988de8fe12e69e59de524849497..99fb87c8e7fb423a7c4d1261c3c907f2f6d37d9a 100644 (file)
@@ -25,6 +25,9 @@
 
 #include "mgr/MgrContext.h"
 
+// For ::config_prefix
+#include "PyModuleRegistry.h"
+
 #include "PyModules.h"
 
 #define dout_context g_ceph_context
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 
-// definition for non-const static member
-std::string PyModules::config_prefix;
-
 // constructor/destructor implementations cannot be in .h,
 // because ServeThread is still an "incomplete" type there
 
-PyModules::PyModules(DaemonStateIndex &ds, ClusterState &cs,
+ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
+          DaemonStateIndex &ds, ClusterState &cs,
          MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
           Client &client_, Finisher &f)
-  : daemon_state(ds), cluster_state(cs), monc(mc), clog(clog_),
-    objecter(objecter_), client(client_), finisher(f),
-    lock("PyModules")
+  : config_cache(config_), daemon_state(ds), cluster_state(cs),
+    monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
+    lock("ActivePyModules")
 {}
 
-PyModules::~PyModules() = default;
+ActivePyModules::~ActivePyModules() = default;
 
-void PyModules::dump_server(const std::string &hostname,
+void ActivePyModules::dump_server(const std::string &hostname,
                       const DaemonStateCollection &dmc,
                       Formatter *f)
 {
@@ -82,7 +83,7 @@ void PyModules::dump_server(const std::string &hostname,
 
 
 
-PyObject *PyModules::get_server_python(const std::string &hostname)
+PyObject *ActivePyModules::get_server_python(const std::string &hostname)
 {
   PyThreadState *tstate = PyEval_SaveThread();
   Mutex::Locker l(lock);
@@ -97,7 +98,7 @@ PyObject *PyModules::get_server_python(const std::string &hostname)
 }
 
 
-PyObject *PyModules::list_servers_python()
+PyObject *ActivePyModules::list_servers_python()
 {
   PyThreadState *tstate = PyEval_SaveThread();
   Mutex::Locker l(lock);
@@ -119,7 +120,7 @@ PyObject *PyModules::list_servers_python()
   return f.get();
 }
 
-PyObject *PyModules::get_metadata_python(
+PyObject *ActivePyModules::get_metadata_python(
   const std::string &svc_type,
   const std::string &svc_id)
 {
@@ -139,7 +140,7 @@ PyObject *PyModules::get_metadata_python(
   return f.get();
 }
 
-PyObject *PyModules::get_daemon_status_python(
+PyObject *ActivePyModules::get_daemon_status_python(
   const std::string &svc_type,
   const std::string &svc_id)
 {
@@ -157,7 +158,7 @@ PyObject *PyModules::get_daemon_status_python(
   return f.get();
 }
 
-PyObject *PyModules::get_python(const std::string &what)
+PyObject *ActivePyModules::get_python(const std::string &what)
 {
   PyThreadState *tstate = PyEval_SaveThread();
   Mutex::Locker l(lock);
@@ -322,174 +323,29 @@ PyObject *PyModules::get_python(const std::string &what)
   }
 }
 
-std::string PyModules::get_site_packages()
-{
-  std::stringstream site_packages;
-
-  // CPython doesn't auto-add site-packages dirs to sys.path for us,
-  // but it does provide a module that we can ask for them.
-  auto site_module = PyImport_ImportModule("site");
-  assert(site_module);
-
-  auto site_packages_fn = PyObject_GetAttrString(site_module, "getsitepackages");
-  if (site_packages_fn != nullptr) {
-    auto site_packages_list = PyObject_CallObject(site_packages_fn, nullptr);
-    assert(site_packages_list);
-
-    auto n = PyList_Size(site_packages_list);
-    for (Py_ssize_t i = 0; i < n; ++i) {
-      if (i != 0) {
-        site_packages << ":";
-      }
-      site_packages << PyString_AsString(PyList_GetItem(site_packages_list, i));
-    }
-
-    Py_DECREF(site_packages_list);
-    Py_DECREF(site_packages_fn);
-  } else {
-    // Fall back to generating our own site-packages paths by imitating
-    // what the standard site.py does.  This is annoying but it lets us
-    // run inside virtualenvs :-/
-
-    auto site_packages_fn = PyObject_GetAttrString(site_module, "addsitepackages");
-    assert(site_packages_fn);
-
-    auto known_paths = PySet_New(nullptr);
-    auto pArgs = PyTuple_Pack(1, known_paths);
-    PyObject_CallObject(site_packages_fn, pArgs);
-    Py_DECREF(pArgs);
-    Py_DECREF(known_paths);
-    Py_DECREF(site_packages_fn);
-
-    auto sys_module = PyImport_ImportModule("sys");
-    assert(sys_module);
-    auto sys_path = PyObject_GetAttrString(sys_module, "path");
-    assert(sys_path);
-
-    dout(1) << "sys.path:" << dendl;
-    auto n = PyList_Size(sys_path);
-    bool first = true;
-    for (Py_ssize_t i = 0; i < n; ++i) {
-      dout(1) << "  " << PyString_AsString(PyList_GetItem(sys_path, i)) << dendl;
-      if (first) {
-        first = false;
-      } else {
-        site_packages << ":";
-      }
-      site_packages << PyString_AsString(PyList_GetItem(sys_path, i));
-    }
-
-    Py_DECREF(sys_path);
-    Py_DECREF(sys_module);
-  }
-
-  Py_DECREF(site_module);
-
-  return site_packages.str();
-}
-
-
-int PyModules::init()
+void ActivePyModules::start_one(std::string const &module_name,
+    PyObject *pClass, PyThreadState *pMyThreadState)
 {
-  Mutex::Locker locker(lock);
-
-  // namespace in config-key prefixed by "mgr/"
-  config_prefix = std::string(g_conf->name.get_type_str()) + "/";
-
-  // Set up global python interpreter
-  Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE));
-  Py_InitializeEx(0);
-
-  // Let CPython know that we will be calling it back from other
-  // threads in future.
-  if (! PyEval_ThreadsInitialized()) {
-    PyEval_InitThreads();
-  }
-
-  // Configure sys.path to include mgr_module_path
-  std::string sys_path = std::string(Py_GetPath()) + ":" + get_site_packages()
-                         + ":" + g_conf->get_val<std::string>("mgr_module_path");
-  dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
-
-  // Drop the GIL and remember the main thread state (current
-  // thread state becomes NULL)
-  pMainThreadState = PyEval_SaveThread();
-  assert(pMainThreadState != nullptr);
-
-  std::list<std::string> failed_modules;
-
-  // Load python code
-  set<string> ls;
-  cluster_state.with_mgrmap([&](const MgrMap& m) {
-      ls = m.modules;
-    });
-  for (const auto& module_name : ls) {
-    dout(1) << "Loading python module '" << module_name << "'" << dendl;
-    auto mod = std::unique_ptr<MgrPyModule>(new MgrPyModule(module_name, sys_path, pMainThreadState));
-    int r = mod->load(this);
-    if (r != 0) {
-      // Don't use handle_pyerror() here; we don't have the GIL
-      // or the right thread state (this is deliberate).
-      derr << "Error loading module '" << module_name << "': "
-        << cpp_strerror(r) << dendl;
-      failed_modules.push_back(module_name);
-      // Don't drop out here, load the other modules
-    } else {
-      // Success!
-      modules[module_name] = std::move(mod);
-    }
-  }
-
-  if (!failed_modules.empty()) {
-    clog->error() << "Failed to load ceph-mgr modules: " << joinify(
-        failed_modules.begin(), failed_modules.end(), std::string(", "));
-  }
-
-  return 0;
-}
-
-class ServeThread : public Thread
-{
-  MgrPyModule *mod;
-
-public:
-  bool running;
-
-  ServeThread(MgrPyModule *mod_)
-    : mod(mod_) {}
-
-  void *entry() override
-  {
-    running = true;
+  Mutex::Locker l(lock);
 
-    // No need to acquire the GIL here; the module does it.
-    dout(4) << "Entering thread for " << mod->get_name() << dendl;
-    mod->serve();
+  assert(modules.count(module_name) == 0);
 
-    running = false;
-    return nullptr;
-  }
-};
+  modules[module_name].reset(new ActivePyModule(
+      module_name, pClass,
+      pMyThreadState));
 
-void PyModules::start()
-{
-  Mutex::Locker l(lock);
+  int r = modules[module_name]->load(this);
 
-  dout(1) << "Creating threads for " << modules.size() << " modules" << dendl;
-  for (auto& i : modules) {
-    auto thread = new ServeThread(i.second.get());
-    serve_threads[i.first].reset(thread);
-  }
+  // FIXME error handling
+  assert(r == 0);
 
-  for (auto &i : serve_threads) {
-    std::ostringstream thread_name;
-    thread_name << "mgr." << i.first;
-    dout(4) << "Starting thread for " << i.first << dendl;
-    i.second->create(thread_name.str().c_str());
-  }
+  std::ostringstream thread_name;
+  thread_name << "mgr." << module_name;
+  dout(4) << "Starting thread for " << module_name << dendl;
+  modules[module_name]->thread.create(thread_name.str().c_str());
 }
 
-void PyModules::shutdown()
+void ActivePyModules::shutdown()
 {
   Mutex::Locker locker(lock);
 
@@ -506,20 +362,16 @@ void PyModules::shutdown()
 
   // For modules implementing serve(), finish the threads where we
   // were running that.
-  for (auto &i : serve_threads) {
+  for (auto &i : modules) {
     lock.Unlock();
-    i.second->join();
+    i.second->thread.join();
     lock.Lock();
   }
-  serve_threads.clear();
 
   modules.clear();
-
-  PyEval_RestoreThread(pMainThreadState);
-  Py_Finalize();
 }
 
-void PyModules::notify_all(const std::string &notify_type,
+void ActivePyModules::notify_all(const std::string &notify_type,
                      const std::string &notify_id)
 {
   Mutex::Locker l(lock);
@@ -527,8 +379,6 @@ void PyModules::notify_all(const std::string &notify_type,
   dout(10) << __func__ << ": notify_all " << notify_type << dendl;
   for (auto& i : modules) {
     auto module = i.second.get();
-    if (!serve_threads[i.first]->running)
-      continue;
     // Send all python calls down a Finisher to avoid blocking
     // C++ code, and avoid any potential lock cycles.
     finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
@@ -537,15 +387,13 @@ void PyModules::notify_all(const std::string &notify_type,
   }
 }
 
-void PyModules::notify_all(const LogEntry &log_entry)
+void ActivePyModules::notify_all(const LogEntry &log_entry)
 {
   Mutex::Locker l(lock);
 
   dout(10) << __func__ << ": notify_all (clog)" << dendl;
   for (auto& i : modules) {
     auto module = i.second.get();
-    if (!serve_threads[i.first]->running)
-      continue;
     // Send all python calls down a Finisher to avoid blocking
     // C++ code, and avoid any potential lock cycles.
     //
@@ -558,14 +406,15 @@ void PyModules::notify_all(const LogEntry &log_entry)
   }
 }
 
-bool PyModules::get_config(const std::string &module_name,
+bool ActivePyModules::get_config(const std::string &module_name,
     const std::string &key, std::string *val) const
 {
   PyThreadState *tstate = PyEval_SaveThread();
   Mutex::Locker l(lock);
   PyEval_RestoreThread(tstate);
 
-  const std::string global_key = config_prefix + module_name + "/" + key;
+  const std::string global_key = PyModuleRegistry::config_prefix
+    + module_name + "/" + key;
 
   dout(4) << __func__ << "key: " << global_key << dendl;
 
@@ -577,14 +426,15 @@ bool PyModules::get_config(const std::string &module_name,
   }
 }
 
-PyObject *PyModules::get_config_prefix(const std::string &module_name,
+PyObject *ActivePyModules::get_config_prefix(const std::string &module_name,
     const std::string &prefix) const
 {
   PyThreadState *tstate = PyEval_SaveThread();
   Mutex::Locker l(lock);
   PyEval_RestoreThread(tstate);
 
-  const std::string base_prefix = config_prefix + module_name + "/";
+  const std::string base_prefix = PyModuleRegistry::config_prefix
+                                    + module_name + "/";
   const std::string global_prefix = base_prefix + prefix;
   dout(4) << __func__ << "prefix: " << global_prefix << dendl;
 
@@ -597,10 +447,11 @@ PyObject *PyModules::get_config_prefix(const std::string &module_name,
   return f.get();
 }
 
-void PyModules::set_config(const std::string &module_name,
+void ActivePyModules::set_config(const std::string &module_name,
     const std::string &key, const boost::optional<std::string>& val)
 {
-  const std::string global_key = config_prefix + module_name + "/" + key;
+  const std::string global_key = PyModuleRegistry::config_prefix
+                                   + module_name + "/" + key;
 
   Command set_cmd;
   {
@@ -640,7 +491,7 @@ void PyModules::set_config(const std::string &module_name,
   }
 }
 
-std::vector<ModuleCommand> PyModules::get_py_commands() const
+std::vector<ModuleCommand> ActivePyModules::get_py_commands() const
 {
   Mutex::Locker l(lock);
 
@@ -656,7 +507,7 @@ std::vector<ModuleCommand> PyModules::get_py_commands() const
   return result;
 }
 
-std::vector<MonCommand> PyModules::get_commands() const
+std::vector<MonCommand> ActivePyModules::get_commands() const
 {
   std::vector<ModuleCommand> commands = get_py_commands();
   std::vector<MonCommand> result;
@@ -668,7 +519,7 @@ std::vector<MonCommand> PyModules::get_commands() const
 }
 
 
-std::map<std::string, std::string> PyModules::get_services() const
+std::map<std::string, std::string> ActivePyModules::get_services() const
 {
   std::map<std::string, std::string> result;
   Mutex::Locker l(lock);
@@ -683,16 +534,7 @@ std::map<std::string, std::string> PyModules::get_services() const
   return result;
 }
 
-void PyModules::insert_config(const std::map<std::string,
-                              std::string> &new_config)
-{
-  Mutex::Locker l(lock);
-
-  dout(4) << "Loaded " << new_config.size() << " config settings" << dendl;
-  config_cache = new_config;
-}
-
-void PyModules::log(const std::string &module_name,
+void ActivePyModules::log(const std::string &module_name,
     int level, const std::string &record)
 {
 #undef dout_prefix
@@ -702,7 +544,7 @@ void PyModules::log(const std::string &module_name,
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 }
 
-PyObject* PyModules::get_counter_python(
+PyObject* ActivePyModules::get_counter_python(
     const std::string &svc_name,
     const std::string &svc_id,
     const std::string &path)
@@ -743,7 +585,7 @@ PyObject* PyModules::get_counter_python(
   return f.get();
 }
 
-PyObject* PyModules::get_perf_schema_python(
+PyObject* ActivePyModules::get_perf_schema_python(
     const std::string svc_type,
     const std::string &svc_id)
 {
@@ -798,7 +640,7 @@ PyObject* PyModules::get_perf_schema_python(
   return f.get();
 }
 
-PyObject *PyModules::get_context()
+PyObject *ActivePyModules::get_context()
 {
   PyThreadState *tstate = PyEval_SaveThread();
   Mutex::Locker l(lock);
@@ -819,7 +661,7 @@ static void delete_osdmap(PyObject *object)
   delete osdmap;
 }
 
-PyObject *PyModules::get_osdmap()
+PyObject *ActivePyModules::get_osdmap()
 {
   PyThreadState *tstate = PyEval_SaveThread();
   Mutex::Locker l(lock);
@@ -834,37 +676,7 @@ PyObject *PyModules::get_osdmap()
   return PyCapsule_New(newmap, nullptr, &delete_osdmap);
 }
 
-static void _list_modules(
-  const std::string path,
-  std::set<std::string> *modules)
-{
-  DIR *dir = opendir(path.c_str());
-  if (!dir) {
-    return;
-  }
-  struct dirent *entry = NULL;
-  while ((entry = readdir(dir)) != NULL) {
-    string n(entry->d_name);
-    string fn = path + "/" + n;
-    struct stat st;
-    int r = ::stat(fn.c_str(), &st);
-    if (r == 0 && S_ISDIR(st.st_mode)) {
-      string initfn = fn + "/module.py";
-      r = ::stat(initfn.c_str(), &st);
-      if (r == 0) {
-       modules->insert(n);
-      }
-    }
-  }
-  closedir(dir);
-}
-
-void PyModules::list_modules(std::set<std::string> *modules)
-{
-  _list_modules(g_conf->get_val<std::string>("mgr_module_path"), modules);
-}
-
-void PyModules::set_health_checks(const std::string& module_name,
+void ActivePyModules::set_health_checks(const std::string& module_name,
                                  health_check_map_t&& checks)
 {
   Mutex::Locker l(lock);
@@ -874,7 +686,7 @@ void PyModules::set_health_checks(const std::string& module_name,
   }
 }
 
-void PyModules::get_health_checks(health_check_map_t *checks)
+void ActivePyModules::get_health_checks(health_check_map_t *checks)
 {
   Mutex::Locker l(lock);
   for (auto& p : modules) {
@@ -882,7 +694,7 @@ void PyModules::get_health_checks(health_check_map_t *checks)
   }
 }
 
-void PyModules::set_uri(const std::string& module_name,
+void ActivePyModules::set_uri(const std::string& module_name,
                         const std::string &uri)
 {
   Mutex::Locker l(lock);
index 17553541ee5ad1b40ea30787a43a1f31d1fb42ca..1dcc15b07f514e5670cecc271c6831dd22acab09 100644 (file)
@@ -18,7 +18,6 @@
 
 #include "common/Finisher.h"
 #include "common/Mutex.h"
-#include "common/Thread.h"
 
 #include "osdc/Objecter.h"
 #include "client/Client.h"
 class ServeThread;
 class health_check_map_t;
 
-class PyModules
+typedef std::map<std::string, std::string> PyModuleConfig;
+
+class ActivePyModules
 {
-  std::map<std::string, std::unique_ptr<MgrPyModule>> modules;
-  std::map<std::string, std::unique_ptr<ServeThread>> serve_threads;
+
+  std::map<std::string, std::unique_ptr<ActivePyModule>> modules;
+  PyModuleConfig config_cache;
   DaemonStateIndex &daemon_state;
   ClusterState &cluster_state;
   MonClient &monc;
@@ -44,20 +46,16 @@ class PyModules
   Client   &client;
   Finisher &finisher;
 
-  mutable Mutex lock{"PyModules::lock"};
 
-  std::string get_site_packages();
-
-  PyThreadState *pMainThreadState = nullptr;
+  mutable Mutex lock{"ActivePyModules::lock"};
 
 public:
-  static std::string config_prefix;
-
-  PyModules(DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
+  ActivePyModules(PyModuleConfig const &config_,
+            DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
             LogChannelRef clog_, Objecter &objecter_, Client &client_,
             Finisher &f);
 
-  ~PyModules();
+  ~ActivePyModules();
 
   // FIXME: wrap for send_command?
   MonClient &get_monc() {return monc;}
@@ -97,16 +95,12 @@ public:
   void log(const std::string &module_name,
            int level, const std::string &record);
 
-  std::map<std::string, std::string> config_cache;
-
   // Python command definitions, including callback
   std::vector<ModuleCommand> get_py_commands() const;
 
   // Monitor command definitions, suitable for CLI
   std::vector<MonCommand> get_commands() const;
 
-  void insert_config(const std::map<std::string, std::string> &new_config);
-
   std::map<std::string, std::string> get_services() const;
 
   // Public so that MonCommandCompletion can use it
@@ -117,14 +111,15 @@ public:
   void notify_all(const LogEntry &log_entry);
 
   int init();
-  void start();
   void shutdown();
 
+  void start_one(std::string const &module_name,
+      PyObject *pClass,
+      PyThreadState *pMyThreadState);
+
   void dump_server(const std::string &hostname,
                    const DaemonStateCollection &dmc,
                    Formatter *f);
-
-  static void list_modules(std::set<std::string> *modules);
 };
 
 #endif
index a0112c5e49967614e16bd755a4fbdbd210fd7e51..e37e4aa22170a4b026baf4e8ecd46033d1d74822 100644 (file)
 
 typedef struct {
   PyObject_HEAD
-  PyModules *py_modules;
-  MgrPyModule *this_module;
+  ActivePyModules *py_modules;
+  ActivePyModule *this_module;
 } BaseMgrModule;
 
 class MonCommandCompletion : public Context
 {
-  PyModules *py_modules;
+  ActivePyModules *py_modules;
   PyObject *python_completion;
   const std::string tag;
   SafeThreadState pThreadState;
@@ -51,7 +51,7 @@ public:
   bufferlist outbl;
 
   MonCommandCompletion(
-      PyModules *py_modules_, PyObject* ev,
+      ActivePyModules *py_modules_, PyObject* ev,
       const std::string &tag_, PyThreadState *ts_)
     : py_modules(py_modules_), python_completion(ev),
       tag(tag_), pThreadState(ts_)
@@ -377,7 +377,7 @@ ceph_config_set(BaseMgrModule *self, PyObject *args)
   if (value) {
     val = value;
   }
-  global_handle->set_config(self->this_module->get_name(), key, val);
+  self->py_modules->set_config(self->this_module->get_name(), key, val);
 
   Py_RETURN_NONE;
 }
@@ -564,9 +564,11 @@ BaseMgrModule_init(BaseMgrModule *self, PyObject *args, PyObject *kwds)
         return -1;
     }
 
-    self->py_modules = (PyModules*)PyCapsule_GetPointer(py_modules_capsule, nullptr);
+    self->py_modules = (ActivePyModules*)PyCapsule_GetPointer(
+        py_modules_capsule, nullptr);
     assert(self->py_modules);
-    self->this_module = (MgrPyModule*)PyCapsule_GetPointer(this_module_capsule, nullptr);
+    self->this_module = (ActivePyModule*)PyCapsule_GetPointer(
+        this_module_capsule, nullptr);
     assert(self->this_module);
 
     return 0;
index 32eb5502175b9b87e03bd8b57e8174404d1ac1b6..44b5206d4e09e5915fbd1f493c2f8c8a89030c5f 100644 (file)
@@ -3,9 +3,6 @@
 
 #include "Python.h"
 
-class PyModules;
-
-extern PyModules *global_handle;
 extern PyTypeObject BaseMgrModuleType;
 
 #endif