]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: enable inter-module calls 27638/head
authorJohn Spray <john.spray@redhat.com>
Thu, 14 Dec 2017 11:24:18 +0000 (06:24 -0500)
committerKefu Chai <kchai@redhat.com>
Wed, 17 Apr 2019 12:05:27 +0000 (20:05 +0800)
This is being done by passing native CPython objects
back and forth.  It's safe because sub-interpreters in CPython
share memory allocation infrastructure and share the GIL.

With a view to PEP554, we limit inter-interpreter calls
to pickleable objects, so that this may be implemented
using byte-arrays in future.

This infrastructure should enable:
 - the dashboard to display the status of other modules, for
   example the set of progress indicators from `progress`
 - dashboard and restful to share an underlying long running
   job mechanism.

Signed-off-by: John Spray <john.spray@redhat.com>
(cherry picked from commit f02316adb4baf4dceaab79cc0ef4c2acdb544f3e)

qa/tasks/mgr/test_module_selftest.py
src/mgr/ActivePyModule.cc
src/mgr/ActivePyModule.h
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/BaseMgrModule.cc
src/pybind/mgr/mgr_module.py
src/pybind/mgr/selftest/module.py

index d1bc04b6ec3332a9f2a2524bd2a3eec2418f41f0..bec37ea26fed9a895182014eb3fa5fe1000b3475 100644 (file)
@@ -264,3 +264,16 @@ class TestModuleSelftest(MgrTestCase):
             "mgr", "module", "disable", "selftest")
 
         self.wait_for_health_clear(timeout=30)
+
+    def test_module_remote(self):
+        """
+        Use the selftest module to exercise inter-module communication
+        """
+        self._load_module("selftest")
+        # The "self-test remote" operation just happens to call into
+        # influx.
+        self._load_module("influx")
+
+        self.mgr_cluster.mon_manager.raw_cluster_cmd(
+            "mgr", "self-test", "remote")
+
index 42ad021d02d1b22ea3682ebd9419acbce9614109..5c9177ad8b52651cadaf82d947594ec017ee724f 100644 (file)
@@ -100,6 +100,61 @@ void ActivePyModule::notify_clog(const LogEntry &log_entry)
   }
 }
 
+bool ActivePyModule::method_exists(const std::string &method) const
+{
+  Gil gil(py_module->pMyThreadState, true);
+
+  auto boundMethod = PyObject_GetAttrString(pClassInstance, method.c_str());
+  if (boundMethod == nullptr) {
+    return false;
+  } else {
+    Py_DECREF(boundMethod);
+    return true;
+  }
+}
+
+PyObject *ActivePyModule::dispatch_remote(
+    const std::string &method,
+    PyObject *args,
+    PyObject *kwargs,
+    std::string *err)
+{
+  assert(err != nullptr);
+
+  // Rather than serializing arguments, pass the CPython objects.
+  // Works because we happen to know that the subinterpreter
+  // implementation shares a GIL, allocator, deallocator and GC state, so
+  // it's okay to pass the objects between subinterpreters.
+  // But in future this might involve serialization to support a CSP-aware
+  // future Python interpreter a la PEP554
+
+  Gil gil(py_module->pMyThreadState, true);
+
+  // Fire the receiving method
+  auto boundMethod = PyObject_GetAttrString(pClassInstance, method.c_str());
+
+  // Caller should have done method_exists check first!
+  assert(boundMethod != nullptr);
+
+  dout(20) << "Calling " << py_module->get_name()
+           << "." << method << "..." << dendl;
+
+  auto remoteResult = PyObject_Call(boundMethod,
+      args, kwargs);
+  Py_DECREF(boundMethod);
+
+  if (remoteResult == nullptr) {
+    // Because the caller is in a different context, we can't let this
+    // exception bubble up, need to re-raise it from the caller's
+    // context later.
+    *err = handle_pyerror();
+  } else {
+    dout(20) << "Success calling '" << method << "'" << dendl;
+  }
+
+  return remoteResult;
+}
+
 
 
 int ActivePyModule::handle_command(
index ebb1ba182be02d4dd72e437525bc1b7c51bc8187..6b5eeda616c27f9019a45b8cbe8b26380706c53c 100644 (file)
@@ -52,12 +52,21 @@ public:
   void notify(const std::string &notify_type, const std::string &notify_id);
   void notify_clog(const LogEntry &le);
 
+  bool method_exists(const std::string &method) const;
+
+  PyObject *dispatch_remote(
+      const std::string &method,
+      PyObject *args,
+      PyObject *kwargs,
+      std::string *err);
+
   int handle_command(
     const cmdmap_t &cmdmap,
     const bufferlist &inbuf,
     std::stringstream *ds,
     std::stringstream *ss);
 
+
   void set_health_checks(health_check_map_t&& c) {
     health_checks = std::move(c);
   }
index 4cdd9b7c66baf1895291c78a2bcc95c47a1cb036..2244eaeb72926c9b142652a5e6ba2cb2d4fd3d4e 100644 (file)
@@ -459,6 +459,19 @@ bool ActivePyModules::get_store(const std::string &module_name,
   }
 }
 
+PyObject *ActivePyModules::dispatch_remote(
+    const std::string &other_module,
+    const std::string &method,
+    PyObject *args,
+    PyObject *kwargs,
+    std::string *err)
+{
+  auto mod_iter = modules.find(other_module);
+  assert(mod_iter != modules.end());
+
+  return mod_iter->second->dispatch_remote(method, args, kwargs, err);
+}
+
 bool ActivePyModules::get_config(const std::string &module_name,
     const std::string &key, std::string *val) const
 {
@@ -835,3 +848,4 @@ void ActivePyModules::set_uri(const std::string& module_name,
   modules[module_name]->set_uri(uri);
 }
 
+
index 08a3f6b49c40562ff02e7a7e44dd91ce344c0f8b..92feba38fa14dcd027ca4234c8b8d8867452665f 100644 (file)
@@ -123,6 +123,25 @@ public:
                   const std::string &notify_id);
   void notify_all(const LogEntry &log_entry);
 
+  bool module_exists(const std::string &name) const
+  {
+    return modules.count(name) > 0;
+  }
+
+  bool method_exists(
+      const std::string &module_name,
+      const std::string &method_name) const
+  {
+    return modules.at(module_name)->method_exists(method_name);
+  }
+
+  PyObject *dispatch_remote(
+      const std::string &other_module,
+      const std::string &method,
+      PyObject *args,
+      PyObject *kwargs,
+      std::string *err);
+
   int init();
   void shutdown();
 
index 8b931091c7df9b990bed381075c8a17baa0ace17..beafa6e2ff7e036d3f0b471aeb964feba578ec05 100644 (file)
@@ -571,6 +571,53 @@ ceph_have_mon_connection(BaseMgrModule *self, PyObject *args)
   }
 }
 
+static PyObject *
+ceph_dispatch_remote(BaseMgrModule *self, PyObject *args)
+{
+  char *other_module = nullptr;
+  char *method = nullptr;
+  PyObject *remote_args = nullptr;
+  PyObject *remote_kwargs = nullptr;
+  if (!PyArg_ParseTuple(args, "ssOO:ceph_dispatch_remote",
+        &other_module, &method, &remote_args, &remote_kwargs)) {
+    return nullptr;
+  }
+
+  // Early error handling, because if the module doesn't exist then we
+  // won't be able to use its thread state to set python error state
+  // inside dispatch_remote().
+  if (!self->py_modules->module_exists(other_module)) {
+    derr << "no module '" << other_module << "'" << dendl;
+    PyErr_SetString(PyExc_ImportError, "Module not found");
+    return nullptr;
+  }
+
+  // Drop GIL from calling python thread state, it will be taken
+  // both for checking for method existence and for executing method.
+  PyThreadState *tstate = PyEval_SaveThread();
+
+  if (!self->py_modules->method_exists(other_module, method)) {
+    PyEval_RestoreThread(tstate);
+    PyErr_SetString(PyExc_NameError, "Method not found");
+    return nullptr;
+  }
+
+  std::string err;
+  auto result = self->py_modules->dispatch_remote(other_module, method,
+      remote_args, remote_kwargs, &err);
+
+  PyEval_RestoreThread(tstate);
+
+  if (result == nullptr) {
+    std::stringstream ss;
+    ss << "Remote method threw exception: " << err;
+    PyErr_SetString(PyExc_RuntimeError, ss.str().c_str());
+    derr << ss.str() << dendl;
+  }
+
+  return result;
+}
+
 
 PyMethodDef BaseMgrModule_methods[] = {
   {"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS,
@@ -637,6 +684,9 @@ PyMethodDef BaseMgrModule_methods[] = {
     METH_NOARGS, "Find out whether this mgr daemon currently has "
                  "a connection to a monitor"},
 
+  {"_ceph_dispatch_remote", (PyCFunction)ceph_dispatch_remote,
+    METH_VARARGS, "Dispatch a call to another module"},
+
   {NULL, NULL, 0, NULL}
 };
 
index 0b1d31e6643b1d58d2fdcca4483fa366339821c7..08e917dd7beebeeb37163f8290e634f5a2eed09b 100644 (file)
@@ -827,3 +827,19 @@ class MgrModule(ceph_module.BaseMgrModule):
         """
 
         return True, ""
+
+    def remote(self, module_name, method_name, *args, **kwargs):
+        """
+        Invoke a method on another module.  All arguments, and the return
+        value from the other module must be serializable.
+
+        :param module_name: Name of other module.  If module isn't loaded,
+                            an ImportError exception is raised.
+        :param method_name: Method name.  If it does not exist, a NameError
+                            exception is raised.
+        :param args: Argument tuple
+        :param kwargs: Keyword argument dict
+        :return:
+        """
+        return self._ceph_dispatch_remote(module_name, method_name,
+                                          args, kwargs)
index 8ba36f622f65d7316c1c5989512aa5ca3f061646..802b757aca1baa2517916a6b4312d1d5a821647d 100644 (file)
@@ -60,6 +60,11 @@ class Module(MgrModule):
                 "desc": "Peek at a configuration value (localized variant)",
                 "perm": "rw"
             },
+            {
+                "cmd": "mgr self-test remote",
+                "desc": "Test inter-module calls",
+                "perm": "r"
+            },
             ]
 
     def __init__(self, *args, **kwargs):
@@ -93,6 +98,9 @@ class Module(MgrModule):
             return 0, str(self.get_config(command['key'])), ''
         elif command['prefix'] == 'mgr self-test config get_localized':
             return 0, str(self.get_localized_config(command['key'])), ''
+        elif command['prefix'] == 'mgr self-test remote':
+            self._test_remote_calls()
+            return 0, '', 'Successfully called'
         else:
             return (-errno.EINVAL, '',
                     "Command not found '{0}'".format(command['prefix']))
@@ -207,6 +215,39 @@ class Module(MgrModule):
 
         self.log.info("Finished self-test procedure.")
 
+    def _test_remote_calls(self):
+        # Test making valid call
+        self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})
+
+        # Test calling module that exists but isn't enabled
+        mgr_map = self.get("mgr_map")
+        all_modules = [m['name'] for m in mgr_map['available_modules']]
+        disabled_modules = set(all_modules) - set(mgr_map['modules'])
+        disabled_module = list(disabled_modules)[0]
+        try:
+            self.remote(disabled_module, "handle_command", {"prefix": "influx self-test"})
+        except ImportError:
+            pass
+        else:
+            raise RuntimeError("ImportError not raised for disabled module")
+
+        # Test calling module that doesn't exist
+        try:
+            self.remote("idontexist", "handle_command", {"prefix": "influx self-test"})
+        except ImportError:
+            pass
+        else:
+            raise RuntimeError("ImportError not raised for nonexistent module")
+
+        # Test calling method that doesn't exist
+        try:
+            self.remote("influx", "idontexist", {"prefix": "influx self-test"})
+        except NameError:
+            pass
+        else:
+            raise RuntimeError("KeyError not raised")
+
+
     def shutdown(self):
         self._workload = self.SHUTDOWN
         self._event.set()