}
}
-int ActivePyModules::start_one(PyModuleRef py_module)
+void ActivePyModules::start_one(PyModuleRef py_module)
{
std::lock_guard l(lock);
ceph_assert(modules.count(py_module->get_name()) == 0);
- modules[py_module->get_name()].reset(new ActivePyModule(py_module, clog));
- auto active_module = modules.at(py_module->get_name()).get();
-
- int r = active_module->load(this);
- if (r != 0) {
- // the class instance wasn't created... remove it from the set of activated
- // modules so commands and notifications aren't delivered.
- modules.erase(py_module->get_name());
- return r;
- } else {
- dout(4) << "Starting thread for " << py_module->get_name() << dendl;
- active_module->thread.create(active_module->get_thread_name());
-
- return 0;
- }
+ const auto name = py_module->get_name();
+ modules[name].reset(new ActivePyModule(py_module, clog));
+ auto active_module = modules.at(name).get();
+
+ // Send all python calls down a Finisher to avoid blocking
+ // C++ code, and avoid any potential lock cycles.
+ finisher.queue(new FunctionContext([this, active_module, name](int) {
+ int r = active_module->load(this);
+ if (r != 0) {
+ derr << "Failed to run module in active mode ('" << name << "')"
+ << dendl;
+ std::lock_guard l(lock);
+ modules.erase(name);
+ } else {
+ dout(4) << "Starting thread for " << name << dendl;
+ active_module->thread.create(active_module->get_thread_name());
+ }
+ }));
}
void ActivePyModules::shutdown()
int init();
void shutdown();
- int start_one(PyModuleRef py_module);
+ void start_one(PyModuleRef py_module);
void dump_server(const std::string &hostname,
const DaemonStateCollection &dmc,
clog(log_client.create_channel(CLOG_CHANNEL_CLUSTER)),
audit_clog(log_client.create_channel(CLOG_CHANNEL_AUDIT)),
lock("MgrStandby::lock"),
+ finisher(g_ceph_context, "MgrStandby", "mgrsb-fin"),
timer(g_ceph_context, lock),
py_module_registry(clog),
active_mgr(nullptr),
std::lock_guard l(lock);
+ // Start finisher
+ finisher.start();
+
// Initialize Messenger
client_messenger->add_dispatcher_tail(this);
client_messenger->add_dispatcher_head(&objecter);
void MgrStandby::handle_signal(int signum)
{
- std::lock_guard l(lock);
ceph_assert(signum == SIGINT || signum == SIGTERM);
derr << "*** Got signal " << sig_str(signum) << " ***" << dendl;
shutdown();
void MgrStandby::shutdown()
{
- // Expect already to be locked as we're called from signal handler
- ceph_assert(lock.is_locked_by_me());
-
- dout(4) << "Shutting down" << dendl;
+ finisher.queue(new FunctionContext([&](int) {
+ std::lock_guard l(lock);
+
+ dout(4) << "Shutting down" << dendl;
+
+ // stop sending beacon first, i use monc to talk with monitors
+ timer.shutdown();
+ // client uses monc and objecter
+ client.shutdown();
+ mgrc.shutdown();
+ // stop monc, so mon won't be able to instruct me to shutdown/activate after
+ // the active_mgr is stopped
+ monc.shutdown();
+ if (active_mgr) {
+ active_mgr->shutdown();
+ }
- // stop sending beacon first, i use monc to talk with monitors
- timer.shutdown();
- // client uses monc and objecter
- client.shutdown();
- mgrc.shutdown();
- // stop monc, so mon won't be able to instruct me to shutdown/activate after
- // the active_mgr is stopped
- monc.shutdown();
- if (active_mgr) {
- active_mgr->shutdown();
- }
+ py_module_registry.shutdown();
- py_module_registry.shutdown();
+ // objecter is used by monc and active_mgr
+ objecter.shutdown();
+ // client_messenger is used by all of them, so stop it in the end
+ client_messenger->shutdown();
+ }));
- // objecter is used by monc and active_mgr
- objecter.shutdown();
- // client_messenger is used by all of them, so stop it in the end
- client_messenger->shutdown();
+ // Then stop the finisher to ensure its enqueued contexts aren't going
+ // to touch references to the things we're about to tear down
+ finisher.wait_for_empty();
+ finisher.stop();
}
void MgrStandby::respawn()
// I am the standby and someone else is active, start modules
// in standby mode to do redirects if needed
if (!py_module_registry.is_standby_running()) {
- py_module_registry.standby_start(monc);
+ py_module_registry.standby_start(monc, finisher);
}
}
}
LogChannelRef clog, audit_clog;
Mutex lock;
+ Finisher finisher;
SafeTimer timer;
PyModuleRegistry py_module_registry;
-void PyModuleRegistry::standby_start(MonClient &mc)
+void PyModuleRegistry::standby_start(MonClient &mc, Finisher &f)
{
std::lock_guard l(lock);
ceph_assert(active_modules == nullptr);
dout(4) << "Starting modules in standby mode" << dendl;
standby_modules.reset(new StandbyPyModules(
- mgr_map, module_config, clog, mc));
+ mgr_map, module_config, clog, mc, f));
std::set<std::string> failed_modules;
for (const auto &i : modules) {
if (i.second->pStandbyClass) {
dout(4) << "starting module " << i.second->get_name() << dendl;
- int r = standby_modules->start_one(i.second);
- if (r != 0) {
- derr << "failed to start module '" << i.second->get_name()
- << "'" << dendl;;
- failed_modules.insert(i.second->get_name());
- // Continue trying to load any other modules
- }
+ standby_modules->start_one(i.second);
} else {
dout(4) << "skipping module '" << i.second->get_name() << "' because "
"it does not implement a standby mode" << dendl;
}
dout(4) << "Starting " << i.first << dendl;
- int r = active_modules->start_one(i.second);
- if (r != 0) {
- derr << "Failed to run module in active mode ('" << i.first << "')"
- << dendl;
- }
+ active_modules->start_one(i.second);
}
}
MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
Objecter &objecter_, Client &client_, Finisher &f,
DaemonServer &server);
- void standby_start(MonClient &mc);
+ void standby_start(MonClient &mc, Finisher &f);
bool is_standby_running() const
{
#include "StandbyPyModules.h"
+#include "common/Finisher.h"
#include "common/debug.h"
#include "common/errno.h"
const MgrMap &mgr_map_,
PyModuleConfig &module_config,
LogChannelRef clog_,
- MonClient &monc_)
+ MonClient &monc_,
+ Finisher &f)
: state(module_config, monc_),
- clog(clog_)
+ clog(clog_),
+ finisher(f)
{
state.set_mgr_map(mgr_map_);
}
modules.clear();
}
-int StandbyPyModules::start_one(PyModuleRef py_module)
+void StandbyPyModules::start_one(PyModuleRef py_module)
{
std::lock_guard l(lock);
- const std::string &module_name = py_module->get_name();
-
- ceph_assert(modules.count(module_name) == 0);
-
- modules[module_name].reset(new StandbyPyModule(
- state,
- py_module, clog));
-
- int r = modules[module_name]->load();
- if (r != 0) {
- modules.erase(module_name);
- return r;
- } else {
- dout(4) << "Starting thread for " << module_name << dendl;
- // Giving Thread the module's module_name member as its
- // char* thread name: thread must not outlive module class lifetime.
- modules[module_name]->thread.create(
- modules[module_name]->get_name().c_str());
- return 0;
- }
+ const auto name = py_module->get_name();
+
+ ceph_assert(modules.count(name) == 0);
+
+ modules[name].reset(new StandbyPyModule(state, py_module, clog));
+ auto standby_module = modules.at(name).get();
+
+ // Send all python calls down a Finisher to avoid blocking
+ // C++ code, and avoid any potential lock cycles.
+ finisher.queue(new FunctionContext([this, standby_module, name](int) {
+ int r = standby_module->load();
+ if (r != 0) {
+ derr << "Failed to run module in standby mode ('" << name << "')"
+ << dendl;
+ std::lock_guard l(lock);
+ modules.erase(name);
+ } else {
+ dout(4) << "Starting thread for " << name << dendl;
+ standby_module->thread.create(standby_module->get_thread_name());
+ }
+ }));
}
int StandbyPyModule::load()
#include "mon/MgrMap.h"
#include "mgr/PyModuleRunner.h"
+class Finisher;
+
/**
* State that is read by all modules running in standby mode
*/
LogChannelRef clog;
+ Finisher &finisher;
+
public:
StandbyPyModules(
const MgrMap &mgr_map_,
PyModuleConfig &module_config,
LogChannelRef clog_,
- MonClient &monc);
+ MonClient &monc,
+ Finisher &f);
- int start_one(PyModuleRef py_module);
+ void start_one(PyModuleRef py_module);
void shutdown();