From: John Spray Date: Mon, 4 Jul 2016 14:22:27 +0000 (+0100) Subject: mgr: enable multiple python modules X-Git-Tag: v11.0.1~60^2~45 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c81e542b32815effe9d107da378e6010e8a0b415;p=ceph.git mgr: enable multiple python modules serve() each one in a separate thread, include a shutdown() hook so that we can tear down cleanly. Signed-off-by: John Spray --- diff --git a/src/ceph_mgr.cc b/src/ceph_mgr.cc index 4a0fc758b7a9..4b4c62a61b40 100644 --- a/src/ceph_mgr.cc +++ b/src/ceph_mgr.cc @@ -23,6 +23,8 @@ #include "global/global_init.h" + + int main(int argc, const char **argv) { vector args; @@ -55,13 +57,6 @@ int main(int argc, const char **argv) } // Finally, execute the user's commands - rc = mgr.main(args); - if (rc != 0) { - std::cerr << "Error (" << cpp_strerror(rc) << ")" << std::endl; - } - - mgr.shutdown(); - - return rc; + return mgr.main(args); } diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index 35c6b075ba20..751979993687 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -15,6 +15,7 @@ #include "mon/MonClient.h" #include "include/stringify.h" #include "global/global_context.h" +#include "global/signal_handler.h" #include "mgr/MgrContext.h" @@ -213,6 +214,8 @@ int Mgr::init() finisher.start(); + py_modules.init(); + dout(4) << "Complete." << dendl; return 0; } @@ -339,6 +342,12 @@ void Mgr::load_config() py_modules.insert_config(loaded); } +void Mgr::handle_signal(int signum) +{ + Mutex::Locker l(lock); + assert(signum == SIGINT || signum == SIGTERM); + shutdown(); +} void Mgr::shutdown() { @@ -349,14 +358,13 @@ void Mgr::shutdown() // to touch references to the things we're about to tear down finisher.stop(); - lock.Lock(); + //lock.Lock(); timer.shutdown(); objecter->shutdown(); - lock.Unlock(); + //lock.Unlock(); monc->shutdown(); client_messenger->shutdown(); - client_messenger->wait(); } void Mgr::handle_osd_map() @@ -552,10 +560,33 @@ bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, return *authorizer != NULL; } +// A reference for use by the signal handler +Mgr *signal_mgr = nullptr; + +static void handle_mgr_signal(int signum) +{ + if (signal_mgr) { + signal_mgr->handle_signal(signum); + } +} int Mgr::main(vector args) { - return py_modules.main(args); + py_modules.start(); + + // Enable signal handlers + signal_mgr = this; + init_async_signal_handler(); + register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal); + + client_messenger->wait(); + + // Disable signal handlers + unregister_async_signal_handler(SIGINT, handle_mgr_signal); + unregister_async_signal_handler(SIGTERM, handle_mgr_signal); + shutdown_async_signal_handler(); + signal_mgr = nullptr; } diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h index 4a2ce779c8ff..e85358f06b7d 100644 --- a/src/mgr/Mgr.h +++ b/src/mgr/Mgr.h @@ -81,6 +81,7 @@ public: void shutdown(); void usage() {} int main(vector args); + void handle_signal(int signum); }; #endif /* MDS_UTILITY_H_ */ diff --git a/src/mgr/MgrPyModule.cc b/src/mgr/MgrPyModule.cc index c1dee9f1968c..4e8d0d3bf64e 100644 --- a/src/mgr/MgrPyModule.cc +++ b/src/mgr/MgrPyModule.cc @@ -93,6 +93,27 @@ int MgrPyModule::serve() return 0; } +// FIXME: DRY wrt serve +void MgrPyModule::shutdown() +{ + assert(pClassInstance != nullptr); + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + auto pValue = PyObject_CallMethod(pClassInstance, + (const char*)"shutdown", (const char*)"()"); + + if (pValue != NULL) { + Py_DECREF(pValue); + } else { + PyErr_Print(); + derr << "Failed to invoke shutdown() on " << module_name << dendl; + } + + PyGILState_Release(gstate); +} + void MgrPyModule::notify(const std::string ¬ify_type, const std::string ¬ify_id) { assert(pClassInstance != nullptr); diff --git a/src/mgr/MgrPyModule.h b/src/mgr/MgrPyModule.h index 8a5dc94edef7..dd490d7e7402 100644 --- a/src/mgr/MgrPyModule.h +++ b/src/mgr/MgrPyModule.h @@ -46,7 +46,6 @@ private: PyObject *pClass; PyObject *pClassInstance; - std::vector commands; int load_commands(); @@ -57,6 +56,7 @@ public: int load(); int serve(); + void shutdown(); void notify(const std::string ¬ify_type, const std::string ¬ify_id); const std::vector &get_commands() const @@ -64,6 +64,11 @@ public: return commands; } + const std::string &get_name() const + { + return module_name; + } + int handle_command( const cmdmap_t &cmdmap, std::stringstream *ss, diff --git a/src/mgr/PyModules.cc b/src/mgr/PyModules.cc index f4b25f72e1b2..40bc68f9cee6 100644 --- a/src/mgr/PyModules.cc +++ b/src/mgr/PyModules.cc @@ -12,6 +12,9 @@ */ +#include +#include "common/errno.h" + #include "PyState.h" #include "PyFormatter.h" @@ -212,8 +215,11 @@ std::string handle_pyerror() return extract(formatted); } -int PyModules::main(vector args) + +int PyModules::init() { + Mutex::Locker locker(lock); + global_handle = this; // Set up global python interpreter @@ -252,53 +258,103 @@ int PyModules::main(vector args) } // Load python code - // TODO load mgr_modules list, run them all in a thread each. - auto mod = new MgrPyModule("rest"); - int r = mod->load(); - if (r != 0) { - derr << "Error loading python module" << dendl; - derr << handle_pyerror() << dendl; -#if 0 - PyObject *ptype, *pvalue, *ptraceback; - PyErr_Fetch(&ptype, &pvalue, &ptraceback); - if (ptype) { - if (pvalue) { - char *pStrErrorMessage = PyString_AsString(pvalue); - -XXX why is pvalue giving null when converted to string? - - assert(pStrErrorMessage != nullptr); - derr << "Exception: " << pStrErrorMessage << dendl; - Py_DECREF(ptraceback); - Py_DECREF(pvalue); - } - Py_DECREF(ptype); + boost::tokenizer<> tok(g_conf->mgr_modules); + for(boost::tokenizer<>::iterator module_name=tok.begin(); + module_name != tok.end();++module_name){ + dout(1) << "Loading python module '" << *module_name << "'" << dendl; + auto mod = new MgrPyModule(*module_name); + int r = mod->load(); + if (r != 0) { + derr << "Error loading module '" << *module_name << "': " + << cpp_strerror(r) << dendl; + derr << handle_pyerror() << dendl; + + return r; + } else { + // Success! + modules[*module_name] = mod; } + } + + // Drop the GIL +#if 1 + PyThreadState *tstate = PyEval_SaveThread(); +#else + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + PyGILState_Release(gstate); #endif + + return 0; +} - // FIXME: be tolerant of bad modules, log an error and continue - // to load other, healthy modules. - return r; - } +class ServeThread : public Thread +{ + MgrPyModule *mod; + +public: + ServeThread(MgrPyModule *mod_) + : mod(mod_) {} + + void *entry() { - Mutex::Locker locker(lock); - modules["rest"] = mod; - } + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); - // Execute python server - mod->serve(); + dout(4) << "Entering thread for " << mod->get_name() << dendl; + mod->serve(); + + PyGILState_Release(gstate); + + return nullptr; + } +}; +void PyModules::start() +{ { - Mutex::Locker locker(lock); - // Tear down modules - for (auto i : modules) { - delete i.second; + Mutex::Locker l(lock); + for (auto &i : modules) { + auto thread = new ServeThread(i.second); + serve_threads[i.first] = thread; } - modules.clear(); } + for (auto &i : serve_threads) { + std::ostringstream thread_name; + thread_name << "mgr." << i.first; + dout(4) << "Starting thread for " << i.first << dendl; + i.second->create(thread_name.str().c_str()); + } +} + +void PyModules::shutdown() +{ + Mutex::Locker locker(lock); + + // Signal modules to drop out of serve() + for (auto i : modules) { + auto module = i.second; + finisher.queue(new C_StdFunction([module](){ + module->shutdown(); + })); + } + + for (auto &i : serve_threads) { + lock.Unlock(); + i.second->join(); + lock.Lock(); + delete i.second; + } + serve_threads.clear(); + + // Tear down modules + for (auto i : modules) { + delete i.second; + } + modules.clear(); + Py_Finalize(); - return 0; } void PyModules::notify_all(const std::string ¬ify_type, @@ -320,6 +376,10 @@ void PyModules::notify_all(const std::string ¬ify_type, bool PyModules::get_config(const std::string &handle, const std::string &key, std::string *val) const { + PyThreadState *tstate = PyEval_SaveThread(); + Mutex::Locker l(lock); + PyEval_RestoreThread(tstate); + const std::string global_key = config_prefix + handle + "." + key; if (config_cache.count(global_key)) { @@ -335,20 +395,25 @@ void PyModules::set_config(const std::string &handle, { const std::string global_key = config_prefix + handle + "." + key; - config_cache[global_key] = val; - - std::ostringstream cmd_json; Command set_cmd; - - JSONFormatter jf; - jf.open_object_section("cmd"); - jf.dump_string("prefix", "config-key put"); - jf.dump_string("key", global_key); - jf.dump_string("val", val); - jf.close_section(); - jf.flush(cmd_json); - - set_cmd.run(&monc, cmd_json.str()); + { + PyThreadState *tstate = PyEval_SaveThread(); + Mutex::Locker l(lock); + PyEval_RestoreThread(tstate); + config_cache[global_key] = val; + + std::ostringstream cmd_json; + + JSONFormatter jf; + jf.open_object_section("cmd"); + jf.dump_string("prefix", "config-key put"); + jf.dump_string("key", global_key); + jf.dump_string("val", val); + jf.close_section(); + jf.flush(cmd_json); + + set_cmd.run(&monc, cmd_json.str()); + } set_cmd.wait(); // FIXME: is config-key put ever allowed to fail? @@ -374,6 +439,8 @@ std::vector PyModules::get_commands() void PyModules::insert_config(const std::map &new_config) { + Mutex::Locker l(lock); + dout(4) << "Loaded " << new_config.size() << " config settings" << dendl; config_cache = new_config; } diff --git a/src/mgr/PyModules.h b/src/mgr/PyModules.h index 93e373409158..31c391888e35 100644 --- a/src/mgr/PyModules.h +++ b/src/mgr/PyModules.h @@ -18,26 +18,25 @@ #include "common/Finisher.h" #include "common/Mutex.h" - +#include "common/Thread.h" #include "DaemonState.h" #include "ClusterState.h" - - - +class ServeThread; class PyModules { protected: std::map modules; + std::map serve_threads; DaemonStateIndex &daemon_state; ClusterState &cluster_state; MonClient &monc; Finisher &finisher; - Mutex lock; + mutable Mutex lock; public: static constexpr auto config_prefix = "mgr."; @@ -65,15 +64,14 @@ public: void insert_config(const std::map &new_config); // Public so that MonCommandCompletion can use it - // FIXME: bit weird that we're sending command completions - // to all modules (we rely on them to ignore anything that - // they don't recognise), but when we get called from - // python-land we don't actually know who we are. Need - // to give python-land a handle in initialisation. + // FIXME: for send_command completion notifications, + // send it to only the module that sent the command, not everyone void notify_all(const std::string ¬ify_type, const std::string ¬ify_id); - int main(std::vector args); + int init(); + void start(); + void shutdown(); void dump_server(const std::string &hostname, const DaemonStateCollection &dmc,