#include "include/assert.h" // boost clobbers this
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
#define dout_prefix *_dout << "mgr " << __func__ << " "
// decode a Python exception into a string
}
-
-
void *ServeThread::entry()
{
// No need to acquire the GIL here; the module does it.
}
}
-void ActivePyModules::start_one(std::string const &module_name,
+int ActivePyModules::start_one(std::string const &module_name,
PyObject *pClass, PyThreadState *pMyThreadState)
{
Mutex::Locker l(lock);
pMyThreadState));
int r = modules[module_name]->load(this);
+ if (r != 0) {
+ return r;
+ } else {
+ std::ostringstream thread_name;
+ thread_name << "mgr." << module_name;
+ dout(4) << "Starting thread for " << module_name << dendl;
+ modules[module_name]->thread.create(thread_name.str().c_str());
- // FIXME error handling
- assert(r == 0);
-
- std::ostringstream thread_name;
- thread_name << "mgr." << module_name;
- dout(4) << "Starting thread for " << module_name << dendl;
- modules[module_name]->thread.create(thread_name.str().c_str());
+ return 0;
+ }
}
void ActivePyModules::shutdown()
int init();
void shutdown();
- void start_one(std::string const &module_name,
- PyObject *pClass,
- PyThreadState *pMyThreadState);
+ int start_one(std::string const &module_name,
+ PyObject *pClass,
+ PyThreadState *pMyThreadState);
void dump_server(const std::string &hostname,
const DaemonStateCollection &dmc,
* available as methods on that object.
*/
+#include "Python.h"
+
#include "Mgr.h"
#include "mon/MonClient.h"
-#ifndef PYSTATE_H_
-#define PYSTATE_H_
+
+#pragma once
#include "Python.h"
extern PyTypeObject BaseMgrModuleType;
-#endif
-
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+
+#include "BaseMgrStandbyModule.h"
+
+#include "StandbyPyModules.h"
+
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+
+typedef struct {
+ PyObject_HEAD
+ StandbyPyModule *this_module;
+} BaseMgrStandbyModule;
+
+static PyObject *
+BaseMgrStandbyModule_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ BaseMgrStandbyModule *self;
+
+ self = (BaseMgrStandbyModule *)type->tp_alloc(type, 0);
+
+ return (PyObject *)self;
+}
+
+static int
+BaseMgrStandbyModule_init(BaseMgrStandbyModule *self, PyObject *args, PyObject *kwds)
+{
+ PyObject *this_module_capsule = nullptr;
+ static const char *kwlist[] = {"this_module", NULL};
+
+ if (! PyArg_ParseTupleAndKeywords(args, kwds, "O",
+ const_cast<char**>(kwlist),
+ &this_module_capsule)) {
+ return -1;
+ }
+
+ self->this_module = (StandbyPyModule*)PyCapsule_GetPointer(
+ this_module_capsule, nullptr);
+ assert(self->this_module);
+
+ return 0;
+}
+
+static PyObject*
+ceph_get_mgr_id(BaseMgrStandbyModule *self, PyObject *args)
+{
+ return PyString_FromString(g_conf->name.get_id().c_str());
+}
+
+static PyObject*
+ceph_config_get(BaseMgrStandbyModule *self, PyObject *args)
+{
+ char *what = nullptr;
+ if (!PyArg_ParseTuple(args, "s:ceph_config_get", &what)) {
+ derr << "Invalid args!" << dendl;
+ return nullptr;
+ }
+
+ std::string value;
+ bool found = self->this_module->get_config(what, &value);
+ if (found) {
+ dout(10) << "ceph_config_get " << what << " found: " << value.c_str() << dendl;
+ return PyString_FromString(value.c_str());
+ } else {
+ dout(4) << "ceph_config_get " << what << " not found " << dendl;
+ Py_RETURN_NONE;
+ }
+}
+
+static PyObject*
+ceph_get_active_uri(BaseMgrStandbyModule *self, PyObject *args)
+{
+ return PyString_FromString(self->this_module->get_active_uri().c_str());
+}
+
+static PyObject*
+ceph_log(BaseMgrStandbyModule *self, PyObject *args)
+{
+ int level = 0;
+ char *record = nullptr;
+ if (!PyArg_ParseTuple(args, "is:log", &level, &record)) {
+ return nullptr;
+ }
+
+ assert(self->this_module);
+
+ self->this_module->log(level, record);
+
+ Py_RETURN_NONE;
+}
+
+PyMethodDef BaseMgrStandbyModule_methods[] = {
+
+ {"_ceph_get_mgr_id", (PyCFunction)ceph_get_mgr_id, METH_NOARGS,
+ "Get the name of the Mgr daemon where we are running"},
+
+ {"_ceph_get_config", (PyCFunction)ceph_config_get, METH_VARARGS,
+ "Get a configuration value"},
+
+ {"_ceph_get_active_uri", (PyCFunction)ceph_get_active_uri, METH_NOARGS,
+ "Get the URI of the active instance of this module, if any"},
+
+ {"_ceph_log", (PyCFunction)ceph_log, METH_VARARGS,
+ "Emit a log message"},
+
+ {NULL, NULL, 0, NULL}
+};
+
+PyTypeObject BaseMgrStandbyModuleType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "ceph_module.BaseMgrStandbyModule", /* tp_name */
+ sizeof(BaseMgrStandbyModule), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ 0, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ "ceph-mgr Standby Python Plugin", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ BaseMgrStandbyModule_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc)BaseMgrStandbyModule_init, /* tp_init */
+ 0, /* tp_alloc */
+ BaseMgrStandbyModule_new, /* tp_new */
+};
--- /dev/null
+
+#pragma once
+
+#include "Python.h"
+
+extern PyTypeObject BaseMgrStandbyModuleType;
+
server.shutdown();
}
// after the messenger is stopped, signal modules to shutdown via finisher
- py_module_registry->shutdown();
+ py_module_registry->active_shutdown();
}));
// Then stop the finisher to ensure its enqueued contexts aren't going
// Expect already to be locked as we're called from signal handler
assert(lock.is_locked_by_me());
+ dout(4) << "Shutting down" << dendl;
+
py_module_registry.shutdown();
// stop sending beacon first, i use monc to talk with monitors
dout(4) << "Map now says I am available" << dendl;
available_in_map = true;
}
+ } else if (active_mgr != nullptr) {
+ derr << "I was active but no longer am" << dendl;
+ respawn();
} else {
- if (active_mgr != nullptr) {
- derr << "I was active but no longer am" << dendl;
- respawn();
+ if (map.active_gid != 0 && map.active_name != g_conf->name.get_id()) {
+ // I am the standby and someone else is active, start modules
+ // in standby mode to do redirects if needed
+ if (!py_module_registry.is_standby_running()) {
+ py_module_registry.standby_start(&monc);
+ }
}
}
std::string MgrStandby::state_str()
{
- return active_mgr == nullptr ? "standby" : "active";
+ if (active_mgr == nullptr) {
+ return "standby";
+ } else if (active_mgr->is_initialized()) {
+ return "active";
+ } else {
+ return "active (starting)";
+ }
}
#include "BaseMgrModule.h"
#include "PyOSDMap.h"
+#include "BaseMgrStandbyModule.h"
#include "Gil.h"
#include "ActivePyModules.h"
Py_InitModule("ceph_osdmap_incremental", OSDMapIncrementalMethods);
Py_InitModule("ceph_crushmap", CRUSHMapMethods);
- // Initialize base class
+ // Initialize base classes
BaseMgrModuleType.tp_new = PyType_GenericNew;
if (PyType_Ready(&BaseMgrModuleType) < 0) {
assert(0);
Py_INCREF(&BaseMgrModuleType);
PyModule_AddObject(ceph_module, "BaseMgrModule",
(PyObject *)&BaseMgrModuleType);
+
+ BaseMgrModuleType.tp_new = PyType_GenericNew;
+ if (PyType_Ready(&BaseMgrStandbyModuleType) < 0) {
+ assert(0);
+ }
+
+ Py_INCREF(&BaseMgrStandbyModuleType);
+ PyModule_AddObject(ceph_module, "BaseMgrStandbyModule",
+ (PyObject *)&BaseMgrStandbyModuleType);
}
// Environment is all good, import the external module
if (pModule == nullptr) {
derr << "Module not found: '" << module_name << "'" << dendl;
derr << handle_pyerror() << dendl;
-
- assert(0);
return -ENOENT;
}
// Find the class
// TODO: let them call it what they want instead of just 'Module'
pClass = PyObject_GetAttrString(pModule, (const char*)"Module");
- Py_DECREF(pModule);
if (pClass == nullptr) {
derr << "Class not found in module '" << module_name << "'" << dendl;
derr << handle_pyerror() << dendl;
return -EINVAL;
}
+
+ pStandbyClass = PyObject_GetAttrString(pModule,
+ (const char*)"StandbyModule");
+ if (pStandbyClass) {
+ dout(4) << "Standby mode available in module '" << module_name
+ << "'" << dendl;
+ } else {
+ dout(4) << "Standby mode not provided by module '" << module_name
+ << "'" << dendl;
+ PyErr_Clear();
+ }
+
+ Py_DECREF(pModule);
}
return 0;
}
+void PyModuleRegistry::standby_start(MonClient *monc)
+{
+ Mutex::Locker l(lock);
+ assert(active_modules == nullptr);
+ assert(standby_modules == nullptr);
+ assert(is_initialized());
+
+ dout(4) << "Starting modules in standby mode" << dendl;
+
+ standby_modules.reset(new StandbyPyModules(monc, mgr_map));
+
+ std::set<std::string> failed_modules;
+ for (const auto &i : modules) {
+ if (i.second->pStandbyClass) {
+ dout(4) << "starting module " << i.second->get_name() << dendl;
+ int r = standby_modules->start_one(i.first,
+ i.second->pStandbyClass,
+ i.second->pMyThreadState);
+ if (r != 0) {
+ derr << "failed to start module '" << i.second->get_name()
+ << "'" << dendl;;
+ failed_modules.insert(i.second->get_name());
+ // Continue trying to load any other modules
+ }
+ } else {
+ dout(4) << "skipping module '" << i.second->get_name() << "' because "
+ "it does not implement a standby mode" << dendl;
+ }
+ }
+
+ if (!failed_modules.empty()) {
+ clog->error() << "Failed to execute ceph-mgr module(s) in standby mode: "
+ << joinify(failed_modules.begin(), failed_modules.end(),
+ std::string(", "));
+ }
+}
+
void PyModuleRegistry::active_start(
PyModuleConfig &config_,
DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
{
Mutex::Locker locker(lock);
+ dout(4) << "Starting modules in active mode" << dendl;
+
assert(active_modules == nullptr);
+ assert(is_initialized());
+
+ if (standby_modules != nullptr) {
+ standby_modules->shutdown();
+ standby_modules.reset();
+ }
active_modules.reset(new ActivePyModules(
config_, ds, cs, mc, clog_, objecter_, client_, f));
for (const auto &i : modules) {
- active_modules->start_one(i.first,
+ dout(4) << "Starting " << i.first << dendl;
+ int r = active_modules->start_one(i.first,
i.second->pClass,
i.second->pMyThreadState);
+ assert(r == 0); // TODO
}
}
-void PyModuleRegistry::shutdown()
+void PyModuleRegistry::active_shutdown()
{
Mutex::Locker locker(lock);
+
if (active_modules != nullptr) {
active_modules->shutdown();
active_modules.reset();
}
+}
+
+void PyModuleRegistry::shutdown()
+{
+ Mutex::Locker locker(lock);
+
+ if (standby_modules != nullptr) {
+ standby_modules->shutdown();
+ standby_modules.reset();
+ }
modules.clear();
#include "common/LogClient.h"
#include "ActivePyModules.h"
+#include "StandbyPyModules.h"
class PyModule
{
public:
PyThreadState *pMyThreadState = nullptr;
PyObject *pClass = nullptr;
+ PyObject *pStandbyClass = nullptr;
PyModule(const std::string &module_name_)
: module_name(module_name_)
{
}
+
int load(PyThreadState *pMainThreadState);
+
+ std::string get_name() const {
+ return module_name;
+ }
};
/**
class PyModuleRegistry
{
private:
+ mutable Mutex lock{"PyModuleRegistry::lock"};
+
LogChannelRef clog;
std::map<std::string, std::unique_ptr<PyModule>> modules;
std::unique_ptr<ActivePyModules> active_modules;
-
- mutable Mutex lock{"PyModuleRegistry::lock"};
+ std::unique_ptr<StandbyPyModules> standby_modules;
PyThreadState *pMainThreadState = nullptr;
bool modules_changed = mgr_map_.modules != mgr_map.modules;
mgr_map = mgr_map_;
+
+ if (standby_modules != nullptr) {
+ standby_modules->handle_mgr_map(mgr_map_);
+ }
+
return modules_changed;
}
DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
LogChannelRef clog_, Objecter &objecter_, Client &client_,
Finisher &f);
- void standby_start();
+ void standby_start(
+ MonClient *monc);
+
+ bool is_standby_running() const
+ {
+ return standby_modules != nullptr;
+ }
+ void active_shutdown();
void shutdown();
template<typename Callback, typename...Args>
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "StandbyPyModules.h"
+
+#include "common/debug.h"
+
+#include "mgr/MgrContext.h"
+#include "mgr/Gil.h"
+
+
+#include <boost/python.hpp>
+#include "include/assert.h" // boost clobbers this
+
+// For ::config_prefix
+#include "PyModuleRegistry.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+// Declaration fulfilled by ActivePyModules
+std::string handle_pyerror();
+
+void* PyModuleRunner::PyModuleRunnerThread::entry()
+{
+ // No need to acquire the GIL here; the module does it.
+ dout(4) << "Entering thread for " << mod->get_name() << dendl;
+ mod->serve();
+ return nullptr;
+}
+
+StandbyPyModules::StandbyPyModules(MonClient *monc_, const MgrMap &mgr_map_)
+ : monc(monc_), load_config_thread(monc, &state)
+{
+ state.set_mgr_map(mgr_map_);
+}
+
+// FIXME: completely identical to ActivePyModules
+void StandbyPyModules::shutdown()
+{
+ Mutex::Locker locker(lock);
+
+ if (!state.is_config_loaded && load_config_thread.is_started()) {
+ // FIXME: handle cases where initial load races with shutdown
+ // this is actually not super rare because
+ assert(0);
+ //load_config_thread.kill(SIGKILL);
+ }
+
+ // Signal modules to drop out of serve() and/or tear down resources
+ for (auto &i : modules) {
+ auto module = i.second.get();
+ const auto& name = i.first;
+ dout(10) << "waiting for module " << name << " to shutdown" << dendl;
+ lock.Unlock();
+ module->shutdown();
+ lock.Lock();
+ dout(10) << "module " << name << " shutdown" << dendl;
+ }
+
+ // For modules implementing serve(), finish the threads where we
+ // were running that.
+ for (auto &i : modules) {
+ lock.Unlock();
+ dout(10) << "joining thread for module " << i.first << dendl;
+ i.second->thread.join();
+ dout(10) << "joined thread for module " << i.first << dendl;
+ lock.Lock();
+ }
+
+ modules.clear();
+}
+
+int StandbyPyModules::start_one(std::string const &module_name,
+ PyObject *pClass, PyThreadState *pMyThreadState)
+{
+ Mutex::Locker l(lock);
+
+ assert(modules.count(module_name) == 0);
+
+ modules[module_name].reset(new StandbyPyModule(
+ state,
+ module_name, pClass,
+ pMyThreadState));
+
+ if (modules.size() == 1) {
+ load_config_thread.create("LoadConfig");
+ }
+
+ int r = modules[module_name]->load();
+ if (r != 0) {
+ modules.erase(module_name);
+ return r;
+ } else {
+ std::ostringstream thread_name;
+ thread_name << "mgr." << module_name;
+ dout(4) << "Starting thread for " << module_name << dendl;
+ modules[module_name]->thread.create(thread_name.str().c_str());
+ return 0;
+ }
+}
+
+int StandbyPyModule::load()
+{
+ Gil gil(pMyThreadState);
+
+ // We tell the module how we name it, so that it can be consistent
+ // with us in logging etc.
+ auto pThisPtr = PyCapsule_New(this, nullptr, nullptr);
+ assert(pThisPtr != nullptr);
+ auto pModuleName = PyString_FromString(module_name.c_str());
+ assert(pModuleName != nullptr);
+ auto pArgs = PyTuple_Pack(2, pModuleName, pThisPtr);
+ Py_DECREF(pThisPtr);
+ Py_DECREF(pModuleName);
+
+ pClassInstance = PyObject_CallObject(pClass, pArgs);
+ Py_DECREF(pArgs);
+ if (pClassInstance == nullptr) {
+ derr << "Failed to construct class in '" << module_name << "'" << dendl;
+ derr << handle_pyerror() << dendl;
+ return -EINVAL;
+ } else {
+ dout(1) << "Constructed class from module: " << module_name << dendl;
+ return 0;
+ }
+}
+
+StandbyPyModule::~StandbyPyModule()
+{
+ Gil gil(pMyThreadState);
+
+ if (pClassInstance) {
+ Py_XDECREF(pClassInstance);
+ pClassInstance = nullptr;
+ }
+
+ Py_DECREF(pClass);
+ pClass = nullptr;
+}
+
+void *StandbyPyModules::LoadConfigThread::entry()
+{
+ dout(10) << "listing keys" << dendl;
+ JSONCommand cmd;
+ cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
+ cmd.wait();
+ assert(cmd.r == 0);
+
+ std::map<std::string, std::string> loaded;
+
+ for (auto &key_str : cmd.json_result.get_array()) {
+ std::string const key = key_str.get_str();
+ dout(20) << "saw key '" << key << "'" << dendl;
+
+ const std::string config_prefix = PyModuleRegistry::config_prefix;
+
+ if (key.substr(0, config_prefix.size()) == config_prefix) {
+ dout(20) << "fetching '" << key << "'" << dendl;
+ Command get_cmd;
+ std::ostringstream cmd_json;
+ cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
+ get_cmd.run(monc, cmd_json.str());
+ get_cmd.wait();
+ assert(get_cmd.r == 0);
+ loaded[key] = get_cmd.outbl.to_str();
+ }
+ }
+ state->loaded_config(loaded);
+
+ return nullptr;
+}
+
+bool StandbyPyModule::get_config(const std::string &key,
+ std::string *value) const
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ PyEval_RestoreThread(tstate);
+
+ const std::string global_key = PyModuleRegistry::config_prefix
+ + module_name + "/" + key;
+
+ dout(4) << __func__ << "key: " << global_key << dendl;
+
+ return state.with_config([global_key, value](const PyModuleConfig &config){
+ if (config.count(global_key)) {
+ *value = config.at(global_key);
+ return true;
+ } else {
+ return false;
+ }
+ });
+}
+
+std::string StandbyPyModule::get_active_uri() const
+{
+ std::string result;
+ state.with_mgr_map([&result, this](const MgrMap &mgr_map){
+ auto iter = mgr_map.services.find(module_name);
+ if (iter != mgr_map.services.end()) {
+ result = iter->second;
+ }
+ });
+
+ return result;
+}
+
+int PyModuleRunner::serve()
+{
+ assert(pClassInstance != nullptr);
+
+ // This method is called from a separate OS thread (i.e. a thread not
+ // created by Python), so tell Gil to wrap this in a new thread state.
+ Gil gil(pMyThreadState, true);
+
+ auto pValue = PyObject_CallMethod(pClassInstance,
+ const_cast<char*>("serve"), nullptr);
+
+ int r = 0;
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ derr << module_name << ".serve:" << dendl;
+ derr << handle_pyerror() << dendl;
+ return -EINVAL;
+ }
+
+ return r;
+}
+
+void PyModuleRunner::shutdown()
+{
+ assert(pClassInstance != nullptr);
+
+ Gil gil(pMyThreadState);
+
+ auto pValue = PyObject_CallMethod(pClassInstance,
+ const_cast<char*>("shutdown"), nullptr);
+
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ derr << "Failed to invoke shutdown() on " << module_name << dendl;
+ derr << handle_pyerror() << dendl;
+ }
+}
+
+void PyModuleRunner::log(int level, const std::string &record)
+{
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr[" << module_name << "] "
+ dout(level) << record << dendl;
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#pragma once
+
+#include "Python.h"
+
+#include <string>
+#include <map>
+
+#include "common/Thread.h"
+#include "common/Mutex.h"
+
+#include "mon/MonClient.h"
+#include "mon/MgrMap.h"
+
+typedef std::map<std::string, std::string> PyModuleConfig;
+
+/**
+ * State that is read by all modules running in standby mode
+ */
+class StandbyPyModuleState
+{
+ mutable Mutex lock{"StandbyPyModuleState::lock"};
+
+ MgrMap mgr_map;
+ PyModuleConfig config_cache;
+
+ mutable Cond config_loaded;
+
+public:
+
+ bool is_config_loaded = false;
+
+ void set_mgr_map(const MgrMap &mgr_map_)
+ {
+ Mutex::Locker l(lock);
+
+ mgr_map = mgr_map_;
+ }
+
+ void loaded_config(const PyModuleConfig &config_)
+ {
+ Mutex::Locker l(lock);
+
+ config_cache = config_;
+ is_config_loaded = true;
+ config_loaded.Signal();
+ }
+
+ template<typename Callback, typename...Args>
+ void with_mgr_map(Callback&& cb, Args&&...args) const
+ {
+ Mutex::Locker l(lock);
+ std::forward<Callback>(cb)(mgr_map, std::forward<Args>(args)...);
+ }
+
+ template<typename Callback, typename...Args>
+ auto with_config(Callback&& cb, Args&&... args) const ->
+ decltype(cb(config_cache, std::forward<Args>(args)...)) {
+ Mutex::Locker l(lock);
+
+ if (!is_config_loaded) {
+ config_loaded.Wait(lock);
+ }
+
+ return std::forward<Callback>(cb)(config_cache, std::forward<Args>(args)...);
+ }
+};
+
+/**
+ * Implement the pattern of calling serve() on a module in a thread,
+ * until shutdown() is called.
+ */
+class PyModuleRunner
+{
+protected:
+ const std::string module_name;
+
+ // Passed in by whoever loaded our python module and looked up
+ // the symbols in it.
+ PyObject *pClass = nullptr;
+
+ // Passed in by whoever created our subinterpreter for us
+ PyThreadState *pMyThreadState = nullptr;
+
+ // Populated when we construct our instance of pClass in load()
+ PyObject *pClassInstance = nullptr;
+
+ class PyModuleRunnerThread : public Thread
+ {
+ PyModuleRunner *mod;
+
+ public:
+ PyModuleRunnerThread(PyModuleRunner *mod_)
+ : mod(mod_) {}
+
+ void *entry() override;
+ };
+
+public:
+ int serve();
+ void shutdown();
+ void log(int level, const std::string &record);
+
+ PyModuleRunner(
+ const std::string &module_name_,
+ PyObject *pClass_,
+ PyThreadState *pMyThreadState_)
+ :
+ module_name(module_name_),
+ pClass(pClass_), pMyThreadState(pMyThreadState_),
+ thread(this)
+ {
+ assert(pClass != nullptr);
+ assert(pMyThreadState != nullptr);
+ assert(!module_name.empty());
+ }
+
+ PyModuleRunnerThread thread;
+
+ std::string const &get_name() { return module_name; }
+};
+
+class StandbyPyModule : public PyModuleRunner
+{
+ StandbyPyModuleState &state;
+
+ public:
+
+ StandbyPyModule(
+ StandbyPyModuleState &state_,
+ const std::string &module_name_,
+ PyObject *pClass_,
+ PyThreadState *pMyThreadState_)
+ :
+ PyModuleRunner(module_name_, pClass_, pMyThreadState_),
+ state(state_)
+ {
+ }
+
+ ~StandbyPyModule();
+
+ bool get_config(const std::string &key, std::string *value) const;
+ std::string get_active_uri() const;
+
+ int load();
+};
+
+class StandbyPyModules
+{
+private:
+ mutable Mutex lock{"StandbyPyModules::lock"};
+ std::map<std::string, std::unique_ptr<StandbyPyModule>> modules;
+
+ MonClient *monc;
+
+ StandbyPyModuleState state;
+
+ void load_config();
+ class LoadConfigThread : public Thread
+ {
+ protected:
+ MonClient *monc;
+ StandbyPyModuleState *state;
+ public:
+ LoadConfigThread(MonClient *monc_, StandbyPyModuleState *state_)
+ : monc(monc_), state(state_)
+ {}
+ void *entry() override;
+ };
+
+ LoadConfigThread load_config_thread;
+
+public:
+
+ StandbyPyModules(
+ MonClient *monc_,
+ const MgrMap &mgr_map_);
+
+ int start_one(std::string const &module_name,
+ PyObject *pClass,
+ PyThreadState *pMyThreadState);
+
+ void shutdown();
+
+ void handle_mgr_map(const MgrMap &mgr_map)
+ {
+ state.set_mgr_map(mgr_map);
+ }
+
+};