From c1471c7501948004096581ee415ab4a1fa2d9379 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 22 Aug 2017 14:42:11 -0400 Subject: [PATCH] mgr: standby modules come up and run now ...they still don't have access to any config though. Signed-off-by: John Spray --- src/mgr/ActivePyModule.cc | 5 +- src/mgr/ActivePyModules.cc | 18 ++- src/mgr/ActivePyModules.h | 6 +- src/mgr/BaseMgrModule.cc | 2 + src/mgr/BaseMgrModule.h | 6 +- src/mgr/BaseMgrStandbyModule.cc | 161 +++++++++++++++++++ src/mgr/BaseMgrStandbyModule.h | 7 + src/mgr/Mgr.cc | 2 +- src/mgr/MgrStandby.cc | 22 ++- src/mgr/PyModuleRegistry.cc | 90 ++++++++++- src/mgr/PyModuleRegistry.h | 26 +++- src/mgr/StandbyPyModules.cc | 268 ++++++++++++++++++++++++++++++++ src/mgr/StandbyPyModules.h | 202 ++++++++++++++++++++++++ 13 files changed, 784 insertions(+), 31 deletions(-) create mode 100644 src/mgr/BaseMgrStandbyModule.cc create mode 100644 src/mgr/BaseMgrStandbyModule.h create mode 100644 src/mgr/StandbyPyModules.cc create mode 100644 src/mgr/StandbyPyModules.h diff --git a/src/mgr/ActivePyModule.cc b/src/mgr/ActivePyModule.cc index c734a60c26a..6048dc707cf 100644 --- a/src/mgr/ActivePyModule.cc +++ b/src/mgr/ActivePyModule.cc @@ -24,6 +24,9 @@ #include "include/assert.h" // boost clobbers this +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix #define dout_prefix *_dout << "mgr " << __func__ << " " // decode a Python exception into a string @@ -49,8 +52,6 @@ std::string handle_pyerror() } - - void *ServeThread::entry() { // No need to acquire the GIL here; the module does it. diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index 8b10010157d..3dcf65dffd1 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -323,7 +323,7 @@ PyObject *ActivePyModules::get_python(const std::string &what) } } -void ActivePyModules::start_one(std::string const &module_name, +int ActivePyModules::start_one(std::string const &module_name, PyObject *pClass, PyThreadState *pMyThreadState) { Mutex::Locker l(lock); @@ -335,14 +335,16 @@ void ActivePyModules::start_one(std::string const &module_name, pMyThreadState)); int r = modules[module_name]->load(this); + if (r != 0) { + return r; + } else { + std::ostringstream thread_name; + thread_name << "mgr." << module_name; + dout(4) << "Starting thread for " << module_name << dendl; + modules[module_name]->thread.create(thread_name.str().c_str()); - // FIXME error handling - assert(r == 0); - - std::ostringstream thread_name; - thread_name << "mgr." << module_name; - dout(4) << "Starting thread for " << module_name << dendl; - modules[module_name]->thread.create(thread_name.str().c_str()); + return 0; + } } void ActivePyModules::shutdown() diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h index 262d7538eca..6e97e244c1f 100644 --- a/src/mgr/ActivePyModules.h +++ b/src/mgr/ActivePyModules.h @@ -112,9 +112,9 @@ public: int init(); void shutdown(); - void start_one(std::string const &module_name, - PyObject *pClass, - PyThreadState *pMyThreadState); + int start_one(std::string const &module_name, + PyObject *pClass, + PyThreadState *pMyThreadState); void dump_server(const std::string &hostname, const DaemonStateCollection &dmc, diff --git a/src/mgr/BaseMgrModule.cc b/src/mgr/BaseMgrModule.cc index 512e8f68b4a..b16cc296502 100644 --- a/src/mgr/BaseMgrModule.cc +++ b/src/mgr/BaseMgrModule.cc @@ -18,6 +18,8 @@ * available as methods on that object. */ +#include "Python.h" + #include "Mgr.h" #include "mon/MonClient.h" diff --git a/src/mgr/BaseMgrModule.h b/src/mgr/BaseMgrModule.h index 44b5206d4e0..2c2e5deb348 100644 --- a/src/mgr/BaseMgrModule.h +++ b/src/mgr/BaseMgrModule.h @@ -1,9 +1,7 @@ -#ifndef PYSTATE_H_ -#define PYSTATE_H_ + +#pragma once #include "Python.h" extern PyTypeObject BaseMgrModuleType; -#endif - diff --git a/src/mgr/BaseMgrStandbyModule.cc b/src/mgr/BaseMgrStandbyModule.cc new file mode 100644 index 00000000000..b7bd0f66383 --- /dev/null +++ b/src/mgr/BaseMgrStandbyModule.cc @@ -0,0 +1,161 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#include "BaseMgrStandbyModule.h" + +#include "StandbyPyModules.h" + + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr + +typedef struct { + PyObject_HEAD + StandbyPyModule *this_module; +} BaseMgrStandbyModule; + +static PyObject * +BaseMgrStandbyModule_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + BaseMgrStandbyModule *self; + + self = (BaseMgrStandbyModule *)type->tp_alloc(type, 0); + + return (PyObject *)self; +} + +static int +BaseMgrStandbyModule_init(BaseMgrStandbyModule *self, PyObject *args, PyObject *kwds) +{ + PyObject *this_module_capsule = nullptr; + static const char *kwlist[] = {"this_module", NULL}; + + if (! PyArg_ParseTupleAndKeywords(args, kwds, "O", + const_cast(kwlist), + &this_module_capsule)) { + return -1; + } + + self->this_module = (StandbyPyModule*)PyCapsule_GetPointer( + this_module_capsule, nullptr); + assert(self->this_module); + + return 0; +} + +static PyObject* +ceph_get_mgr_id(BaseMgrStandbyModule *self, PyObject *args) +{ + return PyString_FromString(g_conf->name.get_id().c_str()); +} + +static PyObject* +ceph_config_get(BaseMgrStandbyModule *self, PyObject *args) +{ + char *what = nullptr; + if (!PyArg_ParseTuple(args, "s:ceph_config_get", &what)) { + derr << "Invalid args!" << dendl; + return nullptr; + } + + std::string value; + bool found = self->this_module->get_config(what, &value); + if (found) { + dout(10) << "ceph_config_get " << what << " found: " << value.c_str() << dendl; + return PyString_FromString(value.c_str()); + } else { + dout(4) << "ceph_config_get " << what << " not found " << dendl; + Py_RETURN_NONE; + } +} + +static PyObject* +ceph_get_active_uri(BaseMgrStandbyModule *self, PyObject *args) +{ + return PyString_FromString(self->this_module->get_active_uri().c_str()); +} + +static PyObject* +ceph_log(BaseMgrStandbyModule *self, PyObject *args) +{ + int level = 0; + char *record = nullptr; + if (!PyArg_ParseTuple(args, "is:log", &level, &record)) { + return nullptr; + } + + assert(self->this_module); + + self->this_module->log(level, record); + + Py_RETURN_NONE; +} + +PyMethodDef BaseMgrStandbyModule_methods[] = { + + {"_ceph_get_mgr_id", (PyCFunction)ceph_get_mgr_id, METH_NOARGS, + "Get the name of the Mgr daemon where we are running"}, + + {"_ceph_get_config", (PyCFunction)ceph_config_get, METH_VARARGS, + "Get a configuration value"}, + + {"_ceph_get_active_uri", (PyCFunction)ceph_get_active_uri, METH_NOARGS, + "Get the URI of the active instance of this module, if any"}, + + {"_ceph_log", (PyCFunction)ceph_log, METH_VARARGS, + "Emit a log message"}, + + {NULL, NULL, 0, NULL} +}; + +PyTypeObject BaseMgrStandbyModuleType = { + PyVarObject_HEAD_INIT(NULL, 0) + "ceph_module.BaseMgrStandbyModule", /* tp_name */ + sizeof(BaseMgrStandbyModule), /* tp_basicsize */ + 0, /* tp_itemsize */ + 0, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */ + "ceph-mgr Standby Python Plugin", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + BaseMgrStandbyModule_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)BaseMgrStandbyModule_init, /* tp_init */ + 0, /* tp_alloc */ + BaseMgrStandbyModule_new, /* tp_new */ +}; diff --git a/src/mgr/BaseMgrStandbyModule.h b/src/mgr/BaseMgrStandbyModule.h new file mode 100644 index 00000000000..c5c6beb1d3c --- /dev/null +++ b/src/mgr/BaseMgrStandbyModule.h @@ -0,0 +1,7 @@ + +#pragma once + +#include "Python.h" + +extern PyTypeObject BaseMgrStandbyModuleType; + diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index 474ddfdb1c8..7c252c91b57 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -391,7 +391,7 @@ void Mgr::shutdown() server.shutdown(); } // after the messenger is stopped, signal modules to shutdown via finisher - py_module_registry->shutdown(); + py_module_registry->active_shutdown(); })); // Then stop the finisher to ensure its enqueued contexts aren't going diff --git a/src/mgr/MgrStandby.cc b/src/mgr/MgrStandby.cc index 323d3a58458..ef8be150b50 100644 --- a/src/mgr/MgrStandby.cc +++ b/src/mgr/MgrStandby.cc @@ -218,6 +218,8 @@ void MgrStandby::shutdown() // Expect already to be locked as we're called from signal handler assert(lock.is_locked_by_me()); + dout(4) << "Shutting down" << dendl; + py_module_registry.shutdown(); // stop sending beacon first, i use monc to talk with monitors @@ -348,10 +350,16 @@ void MgrStandby::handle_mgr_map(MMgrMap* mmap) dout(4) << "Map now says I am available" << dendl; available_in_map = true; } + } else if (active_mgr != nullptr) { + derr << "I was active but no longer am" << dendl; + respawn(); } else { - if (active_mgr != nullptr) { - derr << "I was active but no longer am" << dendl; - respawn(); + if (map.active_gid != 0 && map.active_name != g_conf->name.get_id()) { + // I am the standby and someone else is active, start modules + // in standby mode to do redirects if needed + if (!py_module_registry.is_standby_running()) { + py_module_registry.standby_start(&monc); + } } } @@ -433,6 +441,12 @@ int MgrStandby::main(vector args) std::string MgrStandby::state_str() { - return active_mgr == nullptr ? "standby" : "active"; + if (active_mgr == nullptr) { + return "standby"; + } else if (active_mgr->is_initialized()) { + return "active"; + } else { + return "active (starting)"; + } } diff --git a/src/mgr/PyModuleRegistry.cc b/src/mgr/PyModuleRegistry.cc index d2344ef4d35..117ee4bed8a 100644 --- a/src/mgr/PyModuleRegistry.cc +++ b/src/mgr/PyModuleRegistry.cc @@ -17,6 +17,7 @@ #include "BaseMgrModule.h" #include "PyOSDMap.h" +#include "BaseMgrStandbyModule.h" #include "Gil.h" #include "ActivePyModules.h" @@ -234,7 +235,7 @@ int PyModule::load(PyThreadState *pMainThreadState) Py_InitModule("ceph_osdmap_incremental", OSDMapIncrementalMethods); Py_InitModule("ceph_crushmap", CRUSHMapMethods); - // Initialize base class + // Initialize base classes BaseMgrModuleType.tp_new = PyType_GenericNew; if (PyType_Ready(&BaseMgrModuleType) < 0) { assert(0); @@ -243,6 +244,15 @@ int PyModule::load(PyThreadState *pMainThreadState) Py_INCREF(&BaseMgrModuleType); PyModule_AddObject(ceph_module, "BaseMgrModule", (PyObject *)&BaseMgrModuleType); + + BaseMgrModuleType.tp_new = PyType_GenericNew; + if (PyType_Ready(&BaseMgrStandbyModuleType) < 0) { + assert(0); + } + + Py_INCREF(&BaseMgrStandbyModuleType); + PyModule_AddObject(ceph_module, "BaseMgrStandbyModule", + (PyObject *)&BaseMgrStandbyModuleType); } // Environment is all good, import the external module @@ -256,25 +266,72 @@ int PyModule::load(PyThreadState *pMainThreadState) if (pModule == nullptr) { derr << "Module not found: '" << module_name << "'" << dendl; derr << handle_pyerror() << dendl; - - assert(0); return -ENOENT; } // Find the class // TODO: let them call it what they want instead of just 'Module' pClass = PyObject_GetAttrString(pModule, (const char*)"Module"); - Py_DECREF(pModule); if (pClass == nullptr) { derr << "Class not found in module '" << module_name << "'" << dendl; derr << handle_pyerror() << dendl; return -EINVAL; } + + pStandbyClass = PyObject_GetAttrString(pModule, + (const char*)"StandbyModule"); + if (pStandbyClass) { + dout(4) << "Standby mode available in module '" << module_name + << "'" << dendl; + } else { + dout(4) << "Standby mode not provided by module '" << module_name + << "'" << dendl; + PyErr_Clear(); + } + + Py_DECREF(pModule); } return 0; } +void PyModuleRegistry::standby_start(MonClient *monc) +{ + Mutex::Locker l(lock); + assert(active_modules == nullptr); + assert(standby_modules == nullptr); + assert(is_initialized()); + + dout(4) << "Starting modules in standby mode" << dendl; + + standby_modules.reset(new StandbyPyModules(monc, mgr_map)); + + std::set failed_modules; + for (const auto &i : modules) { + if (i.second->pStandbyClass) { + dout(4) << "starting module " << i.second->get_name() << dendl; + int r = standby_modules->start_one(i.first, + i.second->pStandbyClass, + i.second->pMyThreadState); + if (r != 0) { + derr << "failed to start module '" << i.second->get_name() + << "'" << dendl;; + failed_modules.insert(i.second->get_name()); + // Continue trying to load any other modules + } + } else { + dout(4) << "skipping module '" << i.second->get_name() << "' because " + "it does not implement a standby mode" << dendl; + } + } + + if (!failed_modules.empty()) { + clog->error() << "Failed to execute ceph-mgr module(s) in standby mode: " + << joinify(failed_modules.begin(), failed_modules.end(), + std::string(", ")); + } +} + void PyModuleRegistry::active_start( PyModuleConfig &config_, DaemonStateIndex &ds, ClusterState &cs, MonClient &mc, @@ -283,25 +340,46 @@ void PyModuleRegistry::active_start( { Mutex::Locker locker(lock); + dout(4) << "Starting modules in active mode" << dendl; + assert(active_modules == nullptr); + assert(is_initialized()); + + if (standby_modules != nullptr) { + standby_modules->shutdown(); + standby_modules.reset(); + } active_modules.reset(new ActivePyModules( config_, ds, cs, mc, clog_, objecter_, client_, f)); for (const auto &i : modules) { - active_modules->start_one(i.first, + dout(4) << "Starting " << i.first << dendl; + int r = active_modules->start_one(i.first, i.second->pClass, i.second->pMyThreadState); + assert(r == 0); // TODO } } -void PyModuleRegistry::shutdown() +void PyModuleRegistry::active_shutdown() { Mutex::Locker locker(lock); + if (active_modules != nullptr) { active_modules->shutdown(); active_modules.reset(); } +} + +void PyModuleRegistry::shutdown() +{ + Mutex::Locker locker(lock); + + if (standby_modules != nullptr) { + standby_modules->shutdown(); + standby_modules.reset(); + } modules.clear(); diff --git a/src/mgr/PyModuleRegistry.h b/src/mgr/PyModuleRegistry.h index 842e43f31ee..8249fc9e0d2 100644 --- a/src/mgr/PyModuleRegistry.h +++ b/src/mgr/PyModuleRegistry.h @@ -24,6 +24,7 @@ #include "common/LogClient.h" #include "ActivePyModules.h" +#include "StandbyPyModules.h" class PyModule { @@ -34,12 +35,18 @@ private: public: PyThreadState *pMyThreadState = nullptr; PyObject *pClass = nullptr; + PyObject *pStandbyClass = nullptr; PyModule(const std::string &module_name_) : module_name(module_name_) { } + int load(PyThreadState *pMainThreadState); + + std::string get_name() const { + return module_name; + } }; /** @@ -52,13 +59,14 @@ public: class PyModuleRegistry { private: + mutable Mutex lock{"PyModuleRegistry::lock"}; + LogChannelRef clog; std::map> modules; std::unique_ptr active_modules; - - mutable Mutex lock{"PyModuleRegistry::lock"}; + std::unique_ptr standby_modules; PyThreadState *pMainThreadState = nullptr; @@ -81,6 +89,11 @@ public: bool modules_changed = mgr_map_.modules != mgr_map.modules; mgr_map = mgr_map_; + + if (standby_modules != nullptr) { + standby_modules->handle_mgr_map(mgr_map_); + } + return modules_changed; } @@ -96,8 +109,15 @@ public: DaemonStateIndex &ds, ClusterState &cs, MonClient &mc, LogChannelRef clog_, Objecter &objecter_, Client &client_, Finisher &f); - void standby_start(); + void standby_start( + MonClient *monc); + + bool is_standby_running() const + { + return standby_modules != nullptr; + } + void active_shutdown(); void shutdown(); template diff --git a/src/mgr/StandbyPyModules.cc b/src/mgr/StandbyPyModules.cc new file mode 100644 index 00000000000..ef17dbbeb9c --- /dev/null +++ b/src/mgr/StandbyPyModules.cc @@ -0,0 +1,268 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "StandbyPyModules.h" + +#include "common/debug.h" + +#include "mgr/MgrContext.h" +#include "mgr/Gil.h" + + +#include +#include "include/assert.h" // boost clobbers this + +// For ::config_prefix +#include "PyModuleRegistry.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + +// Declaration fulfilled by ActivePyModules +std::string handle_pyerror(); + +void* PyModuleRunner::PyModuleRunnerThread::entry() +{ + // No need to acquire the GIL here; the module does it. + dout(4) << "Entering thread for " << mod->get_name() << dendl; + mod->serve(); + return nullptr; +} + +StandbyPyModules::StandbyPyModules(MonClient *monc_, const MgrMap &mgr_map_) + : monc(monc_), load_config_thread(monc, &state) +{ + state.set_mgr_map(mgr_map_); +} + +// FIXME: completely identical to ActivePyModules +void StandbyPyModules::shutdown() +{ + Mutex::Locker locker(lock); + + if (!state.is_config_loaded && load_config_thread.is_started()) { + // FIXME: handle cases where initial load races with shutdown + // this is actually not super rare because + assert(0); + //load_config_thread.kill(SIGKILL); + } + + // Signal modules to drop out of serve() and/or tear down resources + for (auto &i : modules) { + auto module = i.second.get(); + const auto& name = i.first; + dout(10) << "waiting for module " << name << " to shutdown" << dendl; + lock.Unlock(); + module->shutdown(); + lock.Lock(); + dout(10) << "module " << name << " shutdown" << dendl; + } + + // For modules implementing serve(), finish the threads where we + // were running that. + for (auto &i : modules) { + lock.Unlock(); + dout(10) << "joining thread for module " << i.first << dendl; + i.second->thread.join(); + dout(10) << "joined thread for module " << i.first << dendl; + lock.Lock(); + } + + modules.clear(); +} + +int StandbyPyModules::start_one(std::string const &module_name, + PyObject *pClass, PyThreadState *pMyThreadState) +{ + Mutex::Locker l(lock); + + assert(modules.count(module_name) == 0); + + modules[module_name].reset(new StandbyPyModule( + state, + module_name, pClass, + pMyThreadState)); + + if (modules.size() == 1) { + load_config_thread.create("LoadConfig"); + } + + int r = modules[module_name]->load(); + if (r != 0) { + modules.erase(module_name); + return r; + } else { + std::ostringstream thread_name; + thread_name << "mgr." << module_name; + dout(4) << "Starting thread for " << module_name << dendl; + modules[module_name]->thread.create(thread_name.str().c_str()); + return 0; + } +} + +int StandbyPyModule::load() +{ + Gil gil(pMyThreadState); + + // We tell the module how we name it, so that it can be consistent + // with us in logging etc. + auto pThisPtr = PyCapsule_New(this, nullptr, nullptr); + assert(pThisPtr != nullptr); + auto pModuleName = PyString_FromString(module_name.c_str()); + assert(pModuleName != nullptr); + auto pArgs = PyTuple_Pack(2, pModuleName, pThisPtr); + Py_DECREF(pThisPtr); + Py_DECREF(pModuleName); + + pClassInstance = PyObject_CallObject(pClass, pArgs); + Py_DECREF(pArgs); + if (pClassInstance == nullptr) { + derr << "Failed to construct class in '" << module_name << "'" << dendl; + derr << handle_pyerror() << dendl; + return -EINVAL; + } else { + dout(1) << "Constructed class from module: " << module_name << dendl; + return 0; + } +} + +StandbyPyModule::~StandbyPyModule() +{ + Gil gil(pMyThreadState); + + if (pClassInstance) { + Py_XDECREF(pClassInstance); + pClassInstance = nullptr; + } + + Py_DECREF(pClass); + pClass = nullptr; +} + +void *StandbyPyModules::LoadConfigThread::entry() +{ + dout(10) << "listing keys" << dendl; + JSONCommand cmd; + cmd.run(monc, "{\"prefix\": \"config-key ls\"}"); + cmd.wait(); + assert(cmd.r == 0); + + std::map loaded; + + for (auto &key_str : cmd.json_result.get_array()) { + std::string const key = key_str.get_str(); + dout(20) << "saw key '" << key << "'" << dendl; + + const std::string config_prefix = PyModuleRegistry::config_prefix; + + if (key.substr(0, config_prefix.size()) == config_prefix) { + dout(20) << "fetching '" << key << "'" << dendl; + Command get_cmd; + std::ostringstream cmd_json; + cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}"; + get_cmd.run(monc, cmd_json.str()); + get_cmd.wait(); + assert(get_cmd.r == 0); + loaded[key] = get_cmd.outbl.to_str(); + } + } + state->loaded_config(loaded); + + return nullptr; +} + +bool StandbyPyModule::get_config(const std::string &key, + std::string *value) const +{ + PyThreadState *tstate = PyEval_SaveThread(); + PyEval_RestoreThread(tstate); + + const std::string global_key = PyModuleRegistry::config_prefix + + module_name + "/" + key; + + dout(4) << __func__ << "key: " << global_key << dendl; + + return state.with_config([global_key, value](const PyModuleConfig &config){ + if (config.count(global_key)) { + *value = config.at(global_key); + return true; + } else { + return false; + } + }); +} + +std::string StandbyPyModule::get_active_uri() const +{ + std::string result; + state.with_mgr_map([&result, this](const MgrMap &mgr_map){ + auto iter = mgr_map.services.find(module_name); + if (iter != mgr_map.services.end()) { + result = iter->second; + } + }); + + return result; +} + +int PyModuleRunner::serve() +{ + assert(pClassInstance != nullptr); + + // This method is called from a separate OS thread (i.e. a thread not + // created by Python), so tell Gil to wrap this in a new thread state. + Gil gil(pMyThreadState, true); + + auto pValue = PyObject_CallMethod(pClassInstance, + const_cast("serve"), nullptr); + + int r = 0; + if (pValue != NULL) { + Py_DECREF(pValue); + } else { + derr << module_name << ".serve:" << dendl; + derr << handle_pyerror() << dendl; + return -EINVAL; + } + + return r; +} + +void PyModuleRunner::shutdown() +{ + assert(pClassInstance != nullptr); + + Gil gil(pMyThreadState); + + auto pValue = PyObject_CallMethod(pClassInstance, + const_cast("shutdown"), nullptr); + + if (pValue != NULL) { + Py_DECREF(pValue); + } else { + derr << "Failed to invoke shutdown() on " << module_name << dendl; + derr << handle_pyerror() << dendl; + } +} + +void PyModuleRunner::log(int level, const std::string &record) +{ +#undef dout_prefix +#define dout_prefix *_dout << "mgr[" << module_name << "] " + dout(level) << record << dendl; +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " +} + diff --git a/src/mgr/StandbyPyModules.h b/src/mgr/StandbyPyModules.h new file mode 100644 index 00000000000..4b2e47a6127 --- /dev/null +++ b/src/mgr/StandbyPyModules.h @@ -0,0 +1,202 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#pragma once + +#include "Python.h" + +#include +#include + +#include "common/Thread.h" +#include "common/Mutex.h" + +#include "mon/MonClient.h" +#include "mon/MgrMap.h" + +typedef std::map PyModuleConfig; + +/** + * State that is read by all modules running in standby mode + */ +class StandbyPyModuleState +{ + mutable Mutex lock{"StandbyPyModuleState::lock"}; + + MgrMap mgr_map; + PyModuleConfig config_cache; + + mutable Cond config_loaded; + +public: + + bool is_config_loaded = false; + + void set_mgr_map(const MgrMap &mgr_map_) + { + Mutex::Locker l(lock); + + mgr_map = mgr_map_; + } + + void loaded_config(const PyModuleConfig &config_) + { + Mutex::Locker l(lock); + + config_cache = config_; + is_config_loaded = true; + config_loaded.Signal(); + } + + template + void with_mgr_map(Callback&& cb, Args&&...args) const + { + Mutex::Locker l(lock); + std::forward(cb)(mgr_map, std::forward(args)...); + } + + template + auto with_config(Callback&& cb, Args&&... args) const -> + decltype(cb(config_cache, std::forward(args)...)) { + Mutex::Locker l(lock); + + if (!is_config_loaded) { + config_loaded.Wait(lock); + } + + return std::forward(cb)(config_cache, std::forward(args)...); + } +}; + +/** + * Implement the pattern of calling serve() on a module in a thread, + * until shutdown() is called. + */ +class PyModuleRunner +{ +protected: + const std::string module_name; + + // Passed in by whoever loaded our python module and looked up + // the symbols in it. + PyObject *pClass = nullptr; + + // Passed in by whoever created our subinterpreter for us + PyThreadState *pMyThreadState = nullptr; + + // Populated when we construct our instance of pClass in load() + PyObject *pClassInstance = nullptr; + + class PyModuleRunnerThread : public Thread + { + PyModuleRunner *mod; + + public: + PyModuleRunnerThread(PyModuleRunner *mod_) + : mod(mod_) {} + + void *entry() override; + }; + +public: + int serve(); + void shutdown(); + void log(int level, const std::string &record); + + PyModuleRunner( + const std::string &module_name_, + PyObject *pClass_, + PyThreadState *pMyThreadState_) + : + module_name(module_name_), + pClass(pClass_), pMyThreadState(pMyThreadState_), + thread(this) + { + assert(pClass != nullptr); + assert(pMyThreadState != nullptr); + assert(!module_name.empty()); + } + + PyModuleRunnerThread thread; + + std::string const &get_name() { return module_name; } +}; + +class StandbyPyModule : public PyModuleRunner +{ + StandbyPyModuleState &state; + + public: + + StandbyPyModule( + StandbyPyModuleState &state_, + const std::string &module_name_, + PyObject *pClass_, + PyThreadState *pMyThreadState_) + : + PyModuleRunner(module_name_, pClass_, pMyThreadState_), + state(state_) + { + } + + ~StandbyPyModule(); + + bool get_config(const std::string &key, std::string *value) const; + std::string get_active_uri() const; + + int load(); +}; + +class StandbyPyModules +{ +private: + mutable Mutex lock{"StandbyPyModules::lock"}; + std::map> modules; + + MonClient *monc; + + StandbyPyModuleState state; + + void load_config(); + class LoadConfigThread : public Thread + { + protected: + MonClient *monc; + StandbyPyModuleState *state; + public: + LoadConfigThread(MonClient *monc_, StandbyPyModuleState *state_) + : monc(monc_), state(state_) + {} + void *entry() override; + }; + + LoadConfigThread load_config_thread; + +public: + + StandbyPyModules( + MonClient *monc_, + const MgrMap &mgr_map_); + + int start_one(std::string const &module_name, + PyObject *pClass, + PyThreadState *pMyThreadState); + + void shutdown(); + + void handle_mgr_map(const MgrMap &mgr_map) + { + state.set_mgr_map(mgr_map); + } + +}; -- 2.39.5