"mgr", "module", "disable", "selftest")
self.wait_for_health_clear(timeout=30)
+
+ def test_module_remote(self):
+ """
+ Use the selftest module to exercise inter-module communication
+ """
+ self._load_module("selftest")
+ # The "self-test remote" operation just happens to call into
+ # influx.
+ self._load_module("influx")
+
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ "mgr", "self-test", "remote")
+
}
}
+bool ActivePyModule::method_exists(const std::string &method) const
+{
+ Gil gil(py_module->pMyThreadState, true);
+
+ auto boundMethod = PyObject_GetAttrString(pClassInstance, method.c_str());
+ if (boundMethod == nullptr) {
+ return false;
+ } else {
+ Py_DECREF(boundMethod);
+ return true;
+ }
+}
+
+PyObject *ActivePyModule::dispatch_remote(
+ const std::string &method,
+ PyObject *args,
+ PyObject *kwargs,
+ std::string *err)
+{
+ assert(err != nullptr);
+
+ // Rather than serializing arguments, pass the CPython objects.
+ // Works because we happen to know that the subinterpreter
+ // implementation shares a GIL, allocator, deallocator and GC state, so
+ // it's okay to pass the objects between subinterpreters.
+ // But in future this might involve serialization to support a CSP-aware
+ // future Python interpreter a la PEP554
+
+ Gil gil(py_module->pMyThreadState, true);
+
+ // Fire the receiving method
+ auto boundMethod = PyObject_GetAttrString(pClassInstance, method.c_str());
+
+ // Caller should have done method_exists check first!
+ assert(boundMethod != nullptr);
+
+ dout(20) << "Calling " << py_module->get_name()
+ << "." << method << "..." << dendl;
+
+ auto remoteResult = PyObject_Call(boundMethod,
+ args, kwargs);
+ Py_DECREF(boundMethod);
+
+ if (remoteResult == nullptr) {
+ // Because the caller is in a different context, we can't let this
+ // exception bubble up, need to re-raise it from the caller's
+ // context later.
+ *err = handle_pyerror();
+ } else {
+ dout(20) << "Success calling '" << method << "'" << dendl;
+ }
+
+ return remoteResult;
+}
+
int ActivePyModule::handle_command(
void notify(const std::string ¬ify_type, const std::string ¬ify_id);
void notify_clog(const LogEntry &le);
+ bool method_exists(const std::string &method) const;
+
+ PyObject *dispatch_remote(
+ const std::string &method,
+ PyObject *args,
+ PyObject *kwargs,
+ std::string *err);
+
int handle_command(
const cmdmap_t &cmdmap,
const bufferlist &inbuf,
std::stringstream *ds,
std::stringstream *ss);
+
void set_health_checks(health_check_map_t&& c) {
health_checks = std::move(c);
}
}
}
+PyObject *ActivePyModules::dispatch_remote(
+ const std::string &other_module,
+ const std::string &method,
+ PyObject *args,
+ PyObject *kwargs,
+ std::string *err)
+{
+ auto mod_iter = modules.find(other_module);
+ assert(mod_iter != modules.end());
+
+ return mod_iter->second->dispatch_remote(method, args, kwargs, err);
+}
+
bool ActivePyModules::get_config(const std::string &module_name,
const std::string &key, std::string *val) const
{
modules[module_name]->set_uri(uri);
}
+
const std::string ¬ify_id);
void notify_all(const LogEntry &log_entry);
+ bool module_exists(const std::string &name) const
+ {
+ return modules.count(name) > 0;
+ }
+
+ bool method_exists(
+ const std::string &module_name,
+ const std::string &method_name) const
+ {
+ return modules.at(module_name)->method_exists(method_name);
+ }
+
+ PyObject *dispatch_remote(
+ const std::string &other_module,
+ const std::string &method,
+ PyObject *args,
+ PyObject *kwargs,
+ std::string *err);
+
int init();
void shutdown();
}
}
+static PyObject *
+ceph_dispatch_remote(BaseMgrModule *self, PyObject *args)
+{
+ char *other_module = nullptr;
+ char *method = nullptr;
+ PyObject *remote_args = nullptr;
+ PyObject *remote_kwargs = nullptr;
+ if (!PyArg_ParseTuple(args, "ssOO:ceph_dispatch_remote",
+ &other_module, &method, &remote_args, &remote_kwargs)) {
+ return nullptr;
+ }
+
+ // Early error handling, because if the module doesn't exist then we
+ // won't be able to use its thread state to set python error state
+ // inside dispatch_remote().
+ if (!self->py_modules->module_exists(other_module)) {
+ derr << "no module '" << other_module << "'" << dendl;
+ PyErr_SetString(PyExc_ImportError, "Module not found");
+ return nullptr;
+ }
+
+ // Drop GIL from calling python thread state, it will be taken
+ // both for checking for method existence and for executing method.
+ PyThreadState *tstate = PyEval_SaveThread();
+
+ if (!self->py_modules->method_exists(other_module, method)) {
+ PyEval_RestoreThread(tstate);
+ PyErr_SetString(PyExc_NameError, "Method not found");
+ return nullptr;
+ }
+
+ std::string err;
+ auto result = self->py_modules->dispatch_remote(other_module, method,
+ remote_args, remote_kwargs, &err);
+
+ PyEval_RestoreThread(tstate);
+
+ if (result == nullptr) {
+ std::stringstream ss;
+ ss << "Remote method threw exception: " << err;
+ PyErr_SetString(PyExc_RuntimeError, ss.str().c_str());
+ derr << ss.str() << dendl;
+ }
+
+ return result;
+}
+
PyMethodDef BaseMgrModule_methods[] = {
{"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS,
METH_NOARGS, "Find out whether this mgr daemon currently has "
"a connection to a monitor"},
+ {"_ceph_dispatch_remote", (PyCFunction)ceph_dispatch_remote,
+ METH_VARARGS, "Dispatch a call to another module"},
+
{NULL, NULL, 0, NULL}
};
"""
return True, ""
+
+ def remote(self, module_name, method_name, *args, **kwargs):
+ """
+ Invoke a method on another module. All arguments, and the return
+ value from the other module must be serializable.
+
+ :param module_name: Name of other module. If module isn't loaded,
+ an ImportError exception is raised.
+ :param method_name: Method name. If it does not exist, a NameError
+ exception is raised.
+ :param args: Argument tuple
+ :param kwargs: Keyword argument dict
+ :return:
+ """
+ return self._ceph_dispatch_remote(module_name, method_name,
+ args, kwargs)
"desc": "Peek at a configuration value (localized variant)",
"perm": "rw"
},
+ {
+ "cmd": "mgr self-test remote",
+ "desc": "Test inter-module calls",
+ "perm": "r"
+ },
]
def __init__(self, *args, **kwargs):
return 0, str(self.get_config(command['key'])), ''
elif command['prefix'] == 'mgr self-test config get_localized':
return 0, str(self.get_localized_config(command['key'])), ''
+ elif command['prefix'] == 'mgr self-test remote':
+ self._test_remote_calls()
+ return 0, '', 'Successfully called'
else:
return (-errno.EINVAL, '',
"Command not found '{0}'".format(command['prefix']))
self.log.info("Finished self-test procedure.")
+ def _test_remote_calls(self):
+ # Test making valid call
+ self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})
+
+ # Test calling module that exists but isn't enabled
+ mgr_map = self.get("mgr_map")
+ all_modules = [m['name'] for m in mgr_map['available_modules']]
+ disabled_modules = set(all_modules) - set(mgr_map['modules'])
+ disabled_module = list(disabled_modules)[0]
+ try:
+ self.remote(disabled_module, "handle_command", {"prefix": "influx self-test"})
+ except ImportError:
+ pass
+ else:
+ raise RuntimeError("ImportError not raised for disabled module")
+
+ # Test calling module that doesn't exist
+ try:
+ self.remote("idontexist", "handle_command", {"prefix": "influx self-test"})
+ except ImportError:
+ pass
+ else:
+ raise RuntimeError("ImportError not raised for nonexistent module")
+
+ # Test calling method that doesn't exist
+ try:
+ self.remote("influx", "idontexist", {"prefix": "influx self-test"})
+ except NameError:
+ pass
+ else:
+ raise RuntimeError("KeyError not raised")
+
+
def shutdown(self):
self._workload = self.SHUTDOWN
self._event.set()