From f02316adb4baf4dceaab79cc0ef4c2acdb544f3e Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 14 Dec 2017 06:24:18 -0500 Subject: [PATCH] mgr: enable inter-module calls This is being done by passing native CPython objects back and forth. It's safe because sub-interpreters in CPython share memory allocation infrastructure and share the GIL. With a view to PEP554, we limit inter-interpreter calls to pickleable objects, so that this may be implemented using byte-arrays in future. This infrastructure should enable: - the dashboard to display the status of other modules, for example the set of progress indicators from `progress` - dashboard and restful to share an underlying long running job mechanism. Signed-off-by: John Spray --- qa/tasks/mgr/test_module_selftest.py | 13 +++++++ src/mgr/ActivePyModule.cc | 55 ++++++++++++++++++++++++++++ src/mgr/ActivePyModule.h | 9 +++++ src/mgr/ActivePyModules.cc | 14 +++++++ src/mgr/ActivePyModules.h | 19 ++++++++++ src/mgr/BaseMgrModule.cc | 50 +++++++++++++++++++++++++ src/pybind/mgr/mgr_module.py | 16 ++++++++ src/pybind/mgr/selftest/module.py | 41 +++++++++++++++++++++ 8 files changed, 217 insertions(+) diff --git a/qa/tasks/mgr/test_module_selftest.py b/qa/tasks/mgr/test_module_selftest.py index 1d4dee05e39..5248fa41a2b 100644 --- a/qa/tasks/mgr/test_module_selftest.py +++ b/qa/tasks/mgr/test_module_selftest.py @@ -274,3 +274,16 @@ class TestModuleSelftest(MgrTestCase): "mgr", "module", "disable", "selftest") self.wait_for_health_clear(timeout=30) + + def test_module_remote(self): + """ + Use the selftest module to exercise inter-module communication + """ + self._load_module("selftest") + # The "self-test remote" operation just happens to call into + # influx. + self._load_module("influx") + + self.mgr_cluster.mon_manager.raw_cluster_cmd( + "mgr", "self-test", "remote") + diff --git a/src/mgr/ActivePyModule.cc b/src/mgr/ActivePyModule.cc index c89334e08a3..425b3f99c7f 100644 --- a/src/mgr/ActivePyModule.cc +++ b/src/mgr/ActivePyModule.cc @@ -100,6 +100,61 @@ void ActivePyModule::notify_clog(const LogEntry &log_entry) } } +bool ActivePyModule::method_exists(const std::string &method) const +{ + Gil gil(py_module->pMyThreadState, true); + + auto boundMethod = PyObject_GetAttrString(pClassInstance, method.c_str()); + if (boundMethod == nullptr) { + return false; + } else { + Py_DECREF(boundMethod); + return true; + } +} + +PyObject *ActivePyModule::dispatch_remote( + const std::string &method, + PyObject *args, + PyObject *kwargs, + std::string *err) +{ + assert(err != nullptr); + + // Rather than serializing arguments, pass the CPython objects. + // Works because we happen to know that the subinterpreter + // implementation shares a GIL, allocator, deallocator and GC state, so + // it's okay to pass the objects between subinterpreters. + // But in future this might involve serialization to support a CSP-aware + // future Python interpreter a la PEP554 + + Gil gil(py_module->pMyThreadState, true); + + // Fire the receiving method + auto boundMethod = PyObject_GetAttrString(pClassInstance, method.c_str()); + + // Caller should have done method_exists check first! + assert(boundMethod != nullptr); + + dout(20) << "Calling " << py_module->get_name() + << "." << method << "..." << dendl; + + auto remoteResult = PyObject_Call(boundMethod, + args, kwargs); + Py_DECREF(boundMethod); + + if (remoteResult == nullptr) { + // Because the caller is in a different context, we can't let this + // exception bubble up, need to re-raise it from the caller's + // context later. + *err = handle_pyerror(); + } else { + dout(20) << "Success calling '" << method << "'" << dendl; + } + + return remoteResult; +} + int ActivePyModule::handle_command( diff --git a/src/mgr/ActivePyModule.h b/src/mgr/ActivePyModule.h index ebb1ba182be..6b5eeda616c 100644 --- a/src/mgr/ActivePyModule.h +++ b/src/mgr/ActivePyModule.h @@ -52,12 +52,21 @@ public: void notify(const std::string ¬ify_type, const std::string ¬ify_id); void notify_clog(const LogEntry &le); + bool method_exists(const std::string &method) const; + + PyObject *dispatch_remote( + const std::string &method, + PyObject *args, + PyObject *kwargs, + std::string *err); + int handle_command( const cmdmap_t &cmdmap, const bufferlist &inbuf, std::stringstream *ds, std::stringstream *ss); + void set_health_checks(health_check_map_t&& c) { health_checks = std::move(c); } diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index 4f05de756e5..46a0f0a6c75 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -480,6 +480,19 @@ bool ActivePyModules::get_store(const std::string &module_name, } } +PyObject *ActivePyModules::dispatch_remote( + const std::string &other_module, + const std::string &method, + PyObject *args, + PyObject *kwargs, + std::string *err) +{ + auto mod_iter = modules.find(other_module); + assert(mod_iter != modules.end()); + + return mod_iter->second->dispatch_remote(method, args, kwargs, err); +} + bool ActivePyModules::get_config(const std::string &module_name, const std::string &key, std::string *val) const { @@ -820,3 +833,4 @@ void ActivePyModules::set_uri(const std::string& module_name, modules[module_name]->set_uri(uri); } + diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h index 322f2bf10f0..d1f51c164e2 100644 --- a/src/mgr/ActivePyModules.h +++ b/src/mgr/ActivePyModules.h @@ -109,6 +109,25 @@ public: const std::string ¬ify_id); void notify_all(const LogEntry &log_entry); + bool module_exists(const std::string &name) const + { + return modules.count(name) > 0; + } + + bool method_exists( + const std::string &module_name, + const std::string &method_name) const + { + return modules.at(module_name)->method_exists(method_name); + } + + PyObject *dispatch_remote( + const std::string &other_module, + const std::string &method, + PyObject *args, + PyObject *kwargs, + std::string *err); + int init(); void shutdown(); diff --git a/src/mgr/BaseMgrModule.cc b/src/mgr/BaseMgrModule.cc index 1f1f8cc5f17..0556a1c4511 100644 --- a/src/mgr/BaseMgrModule.cc +++ b/src/mgr/BaseMgrModule.cc @@ -553,6 +553,53 @@ ceph_have_mon_connection(BaseMgrModule *self, PyObject *args) } } +static PyObject * +ceph_dispatch_remote(BaseMgrModule *self, PyObject *args) +{ + char *other_module = nullptr; + char *method = nullptr; + PyObject *remote_args = nullptr; + PyObject *remote_kwargs = nullptr; + if (!PyArg_ParseTuple(args, "ssOO:ceph_dispatch_remote", + &other_module, &method, &remote_args, &remote_kwargs)) { + return nullptr; + } + + // Early error handling, because if the module doesn't exist then we + // won't be able to use its thread state to set python error state + // inside dispatch_remote(). + if (!self->py_modules->module_exists(other_module)) { + derr << "no module '" << other_module << "'" << dendl; + PyErr_SetString(PyExc_ImportError, "Module not found"); + return nullptr; + } + + // Drop GIL from calling python thread state, it will be taken + // both for checking for method existence and for executing method. + PyThreadState *tstate = PyEval_SaveThread(); + + if (!self->py_modules->method_exists(other_module, method)) { + PyEval_RestoreThread(tstate); + PyErr_SetString(PyExc_NameError, "Method not found"); + return nullptr; + } + + std::string err; + auto result = self->py_modules->dispatch_remote(other_module, method, + remote_args, remote_kwargs, &err); + + PyEval_RestoreThread(tstate); + + if (result == nullptr) { + std::stringstream ss; + ss << "Remote method threw exception: " << err; + PyErr_SetString(PyExc_RuntimeError, ss.str().c_str()); + derr << ss.str() << dendl; + } + + return result; +} + PyMethodDef BaseMgrModule_methods[] = { {"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS, @@ -616,6 +663,9 @@ PyMethodDef BaseMgrModule_methods[] = { METH_NOARGS, "Find out whether this mgr daemon currently has " "a connection to a monitor"}, + {"_ceph_dispatch_remote", (PyCFunction)ceph_dispatch_remote, + METH_VARARGS, "Dispatch a call to another module"}, + {NULL, NULL, 0, NULL} }; diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index cca8eeffc27..501938e29d1 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -829,3 +829,19 @@ class MgrModule(ceph_module.BaseMgrModule): """ return True, "" + + def remote(self, module_name, method_name, *args, **kwargs): + """ + Invoke a method on another module. All arguments, and the return + value from the other module must be serializable. + + :param module_name: Name of other module. If module isn't loaded, + an ImportError exception is raised. + :param method_name: Method name. If it does not exist, a NameError + exception is raised. + :param args: Argument tuple + :param kwargs: Keyword argument dict + :return: + """ + return self._ceph_dispatch_remote(module_name, method_name, + args, kwargs) diff --git a/src/pybind/mgr/selftest/module.py b/src/pybind/mgr/selftest/module.py index 73f1ad70d04..68edd5b30d4 100644 --- a/src/pybind/mgr/selftest/module.py +++ b/src/pybind/mgr/selftest/module.py @@ -60,6 +60,11 @@ class Module(MgrModule): "desc": "Peek at a configuration value (localized variant)", "perm": "rw" }, + { + "cmd": "mgr self-test remote", + "desc": "Test inter-module calls", + "perm": "r" + }, ] def __init__(self, *args, **kwargs): @@ -93,6 +98,9 @@ class Module(MgrModule): return 0, str(self.get_config(command['key'])), '' elif command['prefix'] == 'mgr self-test config get_localized': return 0, str(self.get_localized_config(command['key'])), '' + elif command['prefix'] == 'mgr self-test remote': + self._test_remote_calls() + return 0, '', 'Successfully called' else: return (-errno.EINVAL, '', "Command not found '{0}'".format(command['prefix'])) @@ -207,6 +215,39 @@ class Module(MgrModule): self.log.info("Finished self-test procedure.") + def _test_remote_calls(self): + # Test making valid call + self.remote("influx", "handle_command", "", {"prefix": "influx self-test"}) + + # Test calling module that exists but isn't enabled + mgr_map = self.get("mgr_map") + all_modules = [m['name'] for m in mgr_map['available_modules']] + disabled_modules = set(all_modules) - set(mgr_map['modules']) + disabled_module = list(disabled_modules)[0] + try: + self.remote(disabled_module, "handle_command", {"prefix": "influx self-test"}) + except ImportError: + pass + else: + raise RuntimeError("ImportError not raised for disabled module") + + # Test calling module that doesn't exist + try: + self.remote("idontexist", "handle_command", {"prefix": "influx self-test"}) + except ImportError: + pass + else: + raise RuntimeError("ImportError not raised for nonexistent module") + + # Test calling method that doesn't exist + try: + self.remote("influx", "idontexist", {"prefix": "influx self-test"}) + except NameError: + pass + else: + raise RuntimeError("KeyError not raised") + + def shutdown(self): self._workload = self.SHUTDOWN self._event.set() -- 2.39.5