mgr/DaemonState.cc
mgr/DaemonServer.cc
mgr/ClusterState.cc
- mgr/PyModules.cc
+ mgr/ActivePyModules.cc
mgr/PyModuleRegistry.cc
mgr/PyFormatter.cc
mgr/PyOSDMap.cc
- mgr/PyState.cc
- mgr/MgrPyModule.cc
+ mgr/BaseMgrModule.cc
+ mgr/ActivePyModule.cc
mgr/MgrStandby.cc
mgr/Mgr.cc
mgr/Gil.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "BaseMgrModule.h"
+
+#include "PyFormatter.h"
+
+#include "common/debug.h"
+
+#include "ActivePyModule.h"
+
+//XXX courtesy of http://stackoverflow.com/questions/1418015/how-to-get-python-exception-text
+#include <boost/python.hpp>
+#include "include/assert.h" // boost clobbers this
+
+
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+// decode a Python exception into a string
+std::string handle_pyerror()
+{
+ using namespace boost::python;
+ using namespace boost;
+
+ PyObject *exc, *val, *tb;
+ object formatted_list, formatted;
+ PyErr_Fetch(&exc, &val, &tb);
+ handle<> hexc(exc), hval(allow_null(val)), htb(allow_null(tb));
+ object traceback(import("traceback"));
+ if (!tb) {
+ object format_exception_only(traceback.attr("format_exception_only"));
+ formatted_list = format_exception_only(hexc, hval);
+ } else {
+ object format_exception(traceback.attr("format_exception"));
+ formatted_list = format_exception(hexc,hval, htb);
+ }
+ formatted = str("").join(formatted_list);
+ return extract<std::string>(formatted);
+}
+
+
+
+
+void *ServeThread::entry()
+{
+ // No need to acquire the GIL here; the module does it.
+ dout(4) << "Entering thread for " << mod->get_name() << dendl;
+ mod->serve();
+ return nullptr;
+}
+
+ActivePyModule::ActivePyModule(const std::string &module_name_,
+ PyObject *pClass_,
+ PyThreadState *my_ts_)
+ : module_name(module_name_),
+ pClass(pClass_),
+ pMyThreadState(my_ts_),
+ thread(this)
+{
+}
+
+ActivePyModule::~ActivePyModule()
+{
+ if (pMyThreadState.ts != nullptr) {
+ Gil gil(pMyThreadState);
+
+ Py_XDECREF(pClassInstance);
+
+ //
+ // Ideally, now, we'd be able to do this:
+ //
+ // Py_EndInterpreter(pMyThreadState);
+ // PyThreadState_Swap(pMainThreadState);
+ //
+ // Unfortunately, if the module has any other *python* threads active
+ // at this point, Py_EndInterpreter() will abort with:
+ //
+ // Fatal Python error: Py_EndInterpreter: not the last thread
+ //
+ // This can happen when using CherryPy in a module, becuase CherryPy
+ // runs an extra thread as a timeout monitor, which spends most of its
+ // life inside a time.sleep(60). Unless you are very, very lucky with
+ // the timing calling this destructor, that thread will still be stuck
+ // in a sleep, and Py_EndInterpreter() will abort.
+ //
+ // This could of course also happen with a poorly written module which
+ // made no attempt to clean up any additional threads it created.
+ //
+ // The safest thing to do is just not call Py_EndInterpreter(), and
+ // let Py_Finalize() kill everything after all modules are shut down.
+ //
+ }
+}
+
+int ActivePyModule::load(ActivePyModules *py_modules)
+{
+ Gil gil(pMyThreadState);
+
+ // We tell the module how we name it, so that it can be consistent
+ // with us in logging etc.
+ auto pThisPtr = PyCapsule_New(this, nullptr, nullptr);
+ auto pPyModules = PyCapsule_New(py_modules, nullptr, nullptr);
+ auto pModuleName = PyString_FromString(module_name.c_str());
+ auto pArgs = PyTuple_Pack(3, pModuleName, pPyModules, pThisPtr);
+
+ pClassInstance = PyObject_CallObject(pClass, pArgs);
+ Py_DECREF(pClass);
+ Py_DECREF(pModuleName);
+ Py_DECREF(pArgs);
+ if (pClassInstance == nullptr) {
+ derr << "Failed to construct class in '" << module_name << "'" << dendl;
+ derr << handle_pyerror() << dendl;
+ return -EINVAL;
+ } else {
+ dout(1) << "Constructed class from module: " << module_name << dendl;
+ }
+
+ return load_commands();
+}
+
+int ActivePyModule::serve()
+{
+ assert(pClassInstance != nullptr);
+
+ // This method is called from a separate OS thread (i.e. a thread not
+ // created by Python), so tell Gil to wrap this in a new thread state.
+ Gil gil(pMyThreadState, true);
+
+ auto pValue = PyObject_CallMethod(pClassInstance,
+ const_cast<char*>("serve"), nullptr);
+
+ int r = 0;
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ derr << module_name << ".serve:" << dendl;
+ derr << handle_pyerror() << dendl;
+ return -EINVAL;
+ }
+
+ return r;
+}
+
+void ActivePyModule::shutdown()
+{
+ assert(pClassInstance != nullptr);
+
+ Gil gil(pMyThreadState);
+
+ auto pValue = PyObject_CallMethod(pClassInstance,
+ const_cast<char*>("shutdown"), nullptr);
+
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ derr << "Failed to invoke shutdown() on " << module_name << dendl;
+ derr << handle_pyerror() << dendl;
+ }
+}
+
+void ActivePyModule::notify(const std::string ¬ify_type, const std::string ¬ify_id)
+{
+ assert(pClassInstance != nullptr);
+
+ Gil gil(pMyThreadState);
+
+ // Execute
+ auto pValue = PyObject_CallMethod(pClassInstance,
+ const_cast<char*>("notify"), const_cast<char*>("(ss)"),
+ notify_type.c_str(), notify_id.c_str());
+
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ derr << module_name << ".notify:" << dendl;
+ derr << handle_pyerror() << dendl;
+ // FIXME: callers can't be expected to handle a python module
+ // that has spontaneously broken, but Mgr() should provide
+ // a hook to unload misbehaving modules when they have an
+ // error somewhere like this
+ }
+}
+
+void ActivePyModule::notify_clog(const LogEntry &log_entry)
+{
+ assert(pClassInstance != nullptr);
+
+ Gil gil(pMyThreadState);
+
+ // Construct python-ized LogEntry
+ PyFormatter f;
+ log_entry.dump(&f);
+ auto py_log_entry = f.get();
+
+ // Execute
+ auto pValue = PyObject_CallMethod(pClassInstance,
+ const_cast<char*>("notify"), const_cast<char*>("(sN)"),
+ "clog", py_log_entry);
+
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ derr << module_name << ".notify_clog:" << dendl;
+ derr << handle_pyerror() << dendl;
+ // FIXME: callers can't be expected to handle a python module
+ // that has spontaneously broken, but Mgr() should provide
+ // a hook to unload misbehaving modules when they have an
+ // error somewhere like this
+ }
+}
+
+int ActivePyModule::load_commands()
+{
+ // Don't need a Gil here -- this is called from ActivePyModule::load(),
+ // which already has one.
+ PyObject *command_list = PyObject_GetAttrString(pClassInstance, "COMMANDS");
+ assert(command_list != nullptr);
+ const size_t list_size = PyList_Size(command_list);
+ for (size_t i = 0; i < list_size; ++i) {
+ PyObject *command = PyList_GetItem(command_list, i);
+ assert(command != nullptr);
+
+ ModuleCommand item;
+
+ PyObject *pCmd = PyDict_GetItemString(command, "cmd");
+ assert(pCmd != nullptr);
+ item.cmdstring = PyString_AsString(pCmd);
+
+ dout(20) << "loaded command " << item.cmdstring << dendl;
+
+ PyObject *pDesc = PyDict_GetItemString(command, "desc");
+ assert(pDesc != nullptr);
+ item.helpstring = PyString_AsString(pDesc);
+
+ PyObject *pPerm = PyDict_GetItemString(command, "perm");
+ assert(pPerm != nullptr);
+ item.perm = PyString_AsString(pPerm);
+
+ item.handler = this;
+
+ commands.push_back(item);
+ }
+ Py_DECREF(command_list);
+
+ dout(10) << "loaded " << commands.size() << " commands" << dendl;
+
+ return 0;
+}
+
+int ActivePyModule::handle_command(
+ const cmdmap_t &cmdmap,
+ std::stringstream *ds,
+ std::stringstream *ss)
+{
+ assert(ss != nullptr);
+ assert(ds != nullptr);
+
+ Gil gil(pMyThreadState);
+
+ PyFormatter f;
+ cmdmap_dump(cmdmap, &f);
+ PyObject *py_cmd = f.get();
+
+ auto pResult = PyObject_CallMethod(pClassInstance,
+ const_cast<char*>("handle_command"), const_cast<char*>("(O)"), py_cmd);
+
+ Py_DECREF(py_cmd);
+
+ int r = 0;
+ if (pResult != NULL) {
+ if (PyTuple_Size(pResult) != 3) {
+ r = -EINVAL;
+ } else {
+ r = PyInt_AsLong(PyTuple_GetItem(pResult, 0));
+ *ds << PyString_AsString(PyTuple_GetItem(pResult, 1));
+ *ss << PyString_AsString(PyTuple_GetItem(pResult, 2));
+ }
+
+ Py_DECREF(pResult);
+ } else {
+ *ds << "";
+ *ss << handle_pyerror();
+ r = -EINVAL;
+ }
+
+ return r;
+}
+
+void ActivePyModule::get_health_checks(health_check_map_t *checks)
+{
+ checks->merge(health_checks);
+}
--- /dev/null
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+
+#pragma once
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+
+#include "common/cmdparse.h"
+#include "common/LogEntry.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
+#include "mon/health_check.h"
+#include "mgr/Gil.h"
+
+#include <vector>
+#include <string>
+
+
+class ActivePyModule;
+class ActivePyModules;
+
+/**
+ * A Ceph CLI command description provided from a Python module
+ */
+class ModuleCommand {
+public:
+ std::string cmdstring;
+ std::string helpstring;
+ std::string perm;
+ ActivePyModule *handler;
+};
+
+class ServeThread : public Thread
+{
+ ActivePyModule *mod;
+
+public:
+ ServeThread(ActivePyModule *mod_)
+ : mod(mod_) {}
+
+ void *entry() override;
+};
+
+class ActivePyModule
+{
+private:
+ const std::string module_name;
+
+ // Passed in by whoever loaded our python module and looked up
+ // the symbols in it.
+ PyObject *pClass = nullptr;
+
+ // Passed in by whoever created our subinterpreter for us
+ SafeThreadState pMyThreadState = nullptr;
+
+ // Populated when we construct our instance of pClass in load()
+ PyObject *pClassInstance = nullptr;
+
+ health_check_map_t health_checks;
+
+ std::vector<ModuleCommand> commands;
+
+ int load_commands();
+
+ // Optional, URI exposed by plugins that implement serve()
+ std::string uri;
+
+public:
+ ActivePyModule(const std::string &module_name,
+ PyObject *pClass_,
+ PyThreadState *my_ts);
+ ~ActivePyModule();
+
+ ServeThread thread;
+
+ int load(ActivePyModules *py_modules);
+ int serve();
+ void shutdown();
+ void notify(const std::string ¬ify_type, const std::string ¬ify_id);
+ void notify_clog(const LogEntry &le);
+
+ const std::vector<ModuleCommand> &get_commands() const
+ {
+ return commands;
+ }
+
+ const std::string &get_name() const
+ {
+ return module_name;
+ }
+
+ int handle_command(
+ const cmdmap_t &cmdmap,
+ std::stringstream *ds,
+ std::stringstream *ss);
+
+ void set_health_checks(health_check_map_t&& c) {
+ health_checks = std::move(c);
+ }
+ void get_health_checks(health_check_map_t *checks);
+
+ void set_uri(const std::string &str)
+ {
+ uri = str;
+ }
+
+ std::string get_uri() const
+ {
+ return uri;
+ }
+};
+
+std::string handle_pyerror();
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+// Include this first to get python headers earlier
+#include "BaseMgrModule.h"
+#include "Gil.h"
+
+#include "common/errno.h"
+#include "include/stringify.h"
+
+#include "PyFormatter.h"
+
+#include "osd/OSDMap.h"
+#include "mon/MonMap.h"
+
+#include "mgr/MgrContext.h"
+
+// For ::config_prefix
+#include "PyModuleRegistry.h"
+
+#include "ActivePyModules.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+// constructor/destructor implementations cannot be in .h,
+// because ServeThread is still an "incomplete" type there
+
+ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
+ DaemonStateIndex &ds, ClusterState &cs,
+ MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
+ Client &client_, Finisher &f)
+ : config_cache(config_), daemon_state(ds), cluster_state(cs),
+ monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
+ lock("ActivePyModules")
+{}
+
+ActivePyModules::~ActivePyModules() = default;
+
+void ActivePyModules::dump_server(const std::string &hostname,
+ const DaemonStateCollection &dmc,
+ Formatter *f)
+{
+ f->dump_string("hostname", hostname);
+ f->open_array_section("services");
+ std::string ceph_version;
+
+ for (const auto &i : dmc) {
+ Mutex::Locker l(i.second->lock);
+ const auto &key = i.first;
+ const std::string &str_type = key.first;
+ const std::string &svc_name = key.second;
+
+ // TODO: pick the highest version, and make sure that
+ // somewhere else (during health reporting?) we are
+ // indicating to the user if we see mixed versions
+ auto ver_iter = i.second->metadata.find("ceph_version");
+ if (ver_iter != i.second->metadata.end()) {
+ ceph_version = i.second->metadata.at("ceph_version");
+ }
+
+ f->open_object_section("service");
+ f->dump_string("type", str_type);
+ f->dump_string("id", svc_name);
+ f->close_section();
+ }
+ f->close_section();
+
+ f->dump_string("ceph_version", ceph_version);
+}
+
+
+
+PyObject *ActivePyModules::get_server_python(const std::string &hostname)
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+ dout(10) << " (" << hostname << ")" << dendl;
+
+ auto dmc = daemon_state.get_by_server(hostname);
+
+ PyFormatter f;
+ dump_server(hostname, dmc, &f);
+ return f.get();
+}
+
+
+PyObject *ActivePyModules::list_servers_python()
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+ dout(10) << " >" << dendl;
+
+ PyFormatter f(false, true);
+ daemon_state.with_daemons_by_server([this, &f]
+ (const std::map<std::string, DaemonStateCollection> &all) {
+ for (const auto &i : all) {
+ const auto &hostname = i.first;
+
+ f.open_object_section("server");
+ dump_server(hostname, i.second, &f);
+ f.close_section();
+ }
+ });
+
+ return f.get();
+}
+
+PyObject *ActivePyModules::get_metadata_python(
+ const std::string &svc_type,
+ const std::string &svc_id)
+{
+ auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
+ if (metadata == nullptr) {
+ derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
+ Py_RETURN_NONE;
+ }
+
+ Mutex::Locker l(metadata->lock);
+ PyFormatter f;
+ f.dump_string("hostname", metadata->hostname);
+ for (const auto &i : metadata->metadata) {
+ f.dump_string(i.first.c_str(), i.second);
+ }
+
+ return f.get();
+}
+
+PyObject *ActivePyModules::get_daemon_status_python(
+ const std::string &svc_type,
+ const std::string &svc_id)
+{
+ auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
+ if (metadata == nullptr) {
+ derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
+ Py_RETURN_NONE;
+ }
+
+ Mutex::Locker l(metadata->lock);
+ PyFormatter f;
+ for (const auto &i : metadata->service_status) {
+ f.dump_string(i.first.c_str(), i.second);
+ }
+ return f.get();
+}
+
+PyObject *ActivePyModules::get_python(const std::string &what)
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ if (what == "fs_map") {
+ PyFormatter f;
+ cluster_state.with_fsmap([&f](const FSMap &fsmap) {
+ fsmap.dump(&f);
+ });
+ return f.get();
+ } else if (what == "osdmap_crush_map_text") {
+ bufferlist rdata;
+ cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
+ osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
+ });
+ std::string crush_text = rdata.to_str();
+ return PyString_FromString(crush_text.c_str());
+ } else if (what.substr(0, 7) == "osd_map") {
+ PyFormatter f;
+ cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
+ if (what == "osd_map") {
+ osd_map.dump(&f);
+ } else if (what == "osd_map_tree") {
+ osd_map.print_tree(&f, nullptr);
+ } else if (what == "osd_map_crush") {
+ osd_map.crush->dump(&f);
+ }
+ });
+ return f.get();
+ } else if (what == "config") {
+ PyFormatter f;
+ g_conf->show_config(&f);
+ return f.get();
+ } else if (what == "mon_map") {
+ PyFormatter f;
+ cluster_state.with_monmap(
+ [&f](const MonMap &monmap) {
+ monmap.dump(&f);
+ }
+ );
+ return f.get();
+ } else if (what == "service_map") {
+ PyFormatter f;
+ cluster_state.with_servicemap(
+ [&f](const ServiceMap &service_map) {
+ service_map.dump(&f);
+ }
+ );
+ return f.get();
+ } else if (what == "osd_metadata") {
+ PyFormatter f;
+ auto dmc = daemon_state.get_by_service("osd");
+ for (const auto &i : dmc) {
+ Mutex::Locker l(i.second->lock);
+ f.open_object_section(i.first.second.c_str());
+ f.dump_string("hostname", i.second->hostname);
+ for (const auto &j : i.second->metadata) {
+ f.dump_string(j.first.c_str(), j.second);
+ }
+ f.close_section();
+ }
+ return f.get();
+ } else if (what == "pg_summary") {
+ PyFormatter f;
+ cluster_state.with_pgmap(
+ [&f](const PGMap &pg_map) {
+ std::map<std::string, std::map<std::string, uint32_t> > osds;
+ std::map<std::string, std::map<std::string, uint32_t> > pools;
+ std::map<std::string, uint32_t> all;
+ for (const auto &i : pg_map.pg_stat) {
+ const auto pool = i.first.m_pool;
+ const std::string state = pg_state_string(i.second.state);
+ // Insert to per-pool map
+ pools[stringify(pool)][state]++;
+ for (const auto &osd_id : i.second.acting) {
+ osds[stringify(osd_id)][state]++;
+ }
+ all[state]++;
+ }
+ f.open_object_section("by_osd");
+ for (const auto &i : osds) {
+ f.open_object_section(i.first.c_str());
+ for (const auto &j : i.second) {
+ f.dump_int(j.first.c_str(), j.second);
+ }
+ f.close_section();
+ }
+ f.close_section();
+ f.open_object_section("by_pool");
+ for (const auto &i : pools) {
+ f.open_object_section(i.first.c_str());
+ for (const auto &j : i.second) {
+ f.dump_int(j.first.c_str(), j.second);
+ }
+ f.close_section();
+ }
+ f.close_section();
+ f.open_object_section("all");
+ for (const auto &i : all) {
+ f.dump_int(i.first.c_str(), i.second);
+ }
+ f.close_section();
+ }
+ );
+ return f.get();
+ } else if (what == "pg_status") {
+ PyFormatter f;
+ cluster_state.with_pgmap(
+ [&f](const PGMap &pg_map) {
+ pg_map.print_summary(&f, nullptr);
+ }
+ );
+ return f.get();
+ } else if (what == "pg_dump") {
+ PyFormatter f;
+ cluster_state.with_pgmap(
+ [&f](const PGMap &pg_map) {
+ pg_map.dump(&f);
+ }
+ );
+ return f.get();
+ } else if (what == "df") {
+ PyFormatter f;
+
+ cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
+ cluster_state.with_pgmap(
+ [&osd_map, &f](const PGMap &pg_map) {
+ pg_map.dump_fs_stats(nullptr, &f, true);
+ pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
+ });
+ });
+ return f.get();
+ } else if (what == "osd_stats") {
+ PyFormatter f;
+ cluster_state.with_pgmap(
+ [&f](const PGMap &pg_map) {
+ pg_map.dump_osd_stats(&f);
+ });
+ return f.get();
+ } else if (what == "health" || what == "mon_status") {
+ PyFormatter f;
+ bufferlist json;
+ if (what == "health") {
+ json = cluster_state.get_health();
+ } else if (what == "mon_status") {
+ json = cluster_state.get_mon_status();
+ } else {
+ ceph_abort();
+ }
+ f.dump_string("json", json.to_str());
+ return f.get();
+ } else if (what == "mgr_map") {
+ PyFormatter f;
+ cluster_state.with_mgrmap([&f](const MgrMap &mgr_map) {
+ mgr_map.dump(&f);
+ });
+ return f.get();
+ } else {
+ derr << "Python module requested unknown data '" << what << "'" << dendl;
+ Py_RETURN_NONE;
+ }
+}
+
+void ActivePyModules::start_one(std::string const &module_name,
+ PyObject *pClass, PyThreadState *pMyThreadState)
+{
+ Mutex::Locker l(lock);
+
+ assert(modules.count(module_name) == 0);
+
+ modules[module_name].reset(new ActivePyModule(
+ module_name, pClass,
+ pMyThreadState));
+
+ int r = modules[module_name]->load(this);
+
+ // FIXME error handling
+ assert(r == 0);
+
+ std::ostringstream thread_name;
+ thread_name << "mgr." << module_name;
+ dout(4) << "Starting thread for " << module_name << dendl;
+ modules[module_name]->thread.create(thread_name.str().c_str());
+}
+
+void ActivePyModules::shutdown()
+{
+ Mutex::Locker locker(lock);
+
+ // Signal modules to drop out of serve() and/or tear down resources
+ for (auto &i : modules) {
+ auto module = i.second.get();
+ const auto& name = i.first;
+ dout(10) << "waiting for module " << name << " to shutdown" << dendl;
+ lock.Unlock();
+ module->shutdown();
+ lock.Lock();
+ dout(10) << "module " << name << " shutdown" << dendl;
+ }
+
+ // For modules implementing serve(), finish the threads where we
+ // were running that.
+ for (auto &i : modules) {
+ lock.Unlock();
+ i.second->thread.join();
+ lock.Lock();
+ }
+
+ modules.clear();
+}
+
+void ActivePyModules::notify_all(const std::string ¬ify_type,
+ const std::string ¬ify_id)
+{
+ Mutex::Locker l(lock);
+
+ dout(10) << __func__ << ": notify_all " << notify_type << dendl;
+ for (auto& i : modules) {
+ auto module = i.second.get();
+ // Send all python calls down a Finisher to avoid blocking
+ // C++ code, and avoid any potential lock cycles.
+ finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
+ module->notify(notify_type, notify_id);
+ }));
+ }
+}
+
+void ActivePyModules::notify_all(const LogEntry &log_entry)
+{
+ Mutex::Locker l(lock);
+
+ dout(10) << __func__ << ": notify_all (clog)" << dendl;
+ for (auto& i : modules) {
+ auto module = i.second.get();
+ // Send all python calls down a Finisher to avoid blocking
+ // C++ code, and avoid any potential lock cycles.
+ //
+ // Note intentional use of non-reference lambda binding on
+ // log_entry: we take a copy because caller's instance is
+ // probably ephemeral.
+ finisher.queue(new FunctionContext([module, log_entry](int r){
+ module->notify_clog(log_entry);
+ }));
+ }
+}
+
+bool ActivePyModules::get_config(const std::string &module_name,
+ const std::string &key, std::string *val) const
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ const std::string global_key = PyModuleRegistry::config_prefix
+ + module_name + "/" + key;
+
+ dout(4) << __func__ << "key: " << global_key << dendl;
+
+ if (config_cache.count(global_key)) {
+ *val = config_cache.at(global_key);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+PyObject *ActivePyModules::get_config_prefix(const std::string &module_name,
+ const std::string &prefix) const
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ const std::string base_prefix = PyModuleRegistry::config_prefix
+ + module_name + "/";
+ const std::string global_prefix = base_prefix + prefix;
+ dout(4) << __func__ << "prefix: " << global_prefix << dendl;
+
+ PyFormatter f;
+ for (auto p = config_cache.lower_bound(global_prefix);
+ p != config_cache.end() && p->first.find(global_prefix) == 0;
+ ++p) {
+ f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
+ }
+ return f.get();
+}
+
+void ActivePyModules::set_config(const std::string &module_name,
+ const std::string &key, const boost::optional<std::string>& val)
+{
+ const std::string global_key = PyModuleRegistry::config_prefix
+ + module_name + "/" + key;
+
+ Command set_cmd;
+ {
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+ if (val) {
+ config_cache[global_key] = *val;
+ } else {
+ config_cache.erase(global_key);
+ }
+
+ std::ostringstream cmd_json;
+ JSONFormatter jf;
+ jf.open_object_section("cmd");
+ if (val) {
+ jf.dump_string("prefix", "config-key set");
+ jf.dump_string("key", global_key);
+ jf.dump_string("val", *val);
+ } else {
+ jf.dump_string("prefix", "config-key del");
+ jf.dump_string("key", global_key);
+ }
+ jf.close_section();
+ jf.flush(cmd_json);
+ set_cmd.run(&monc, cmd_json.str());
+ }
+ set_cmd.wait();
+
+ if (set_cmd.r != 0) {
+ // config-key set will fail if mgr's auth key has insufficient
+ // permission to set config keys
+ // FIXME: should this somehow raise an exception back into Python land?
+ dout(0) << "`config-key set " << global_key << " " << val << "` failed: "
+ << cpp_strerror(set_cmd.r) << dendl;
+ dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
+ }
+}
+
+std::vector<ModuleCommand> ActivePyModules::get_py_commands() const
+{
+ Mutex::Locker l(lock);
+
+ std::vector<ModuleCommand> result;
+ for (const auto& i : modules) {
+ auto module = i.second.get();
+ auto mod_commands = module->get_commands();
+ for (auto j : mod_commands) {
+ result.push_back(j);
+ }
+ }
+
+ return result;
+}
+
+std::vector<MonCommand> ActivePyModules::get_commands() const
+{
+ std::vector<ModuleCommand> commands = get_py_commands();
+ std::vector<MonCommand> result;
+ for (auto &pyc: commands) {
+ result.push_back({pyc.cmdstring, pyc.helpstring, "mgr",
+ pyc.perm, "cli", MonCommand::FLAG_MGR});
+ }
+ return result;
+}
+
+
+std::map<std::string, std::string> ActivePyModules::get_services() const
+{
+ std::map<std::string, std::string> result;
+ Mutex::Locker l(lock);
+ for (const auto& i : modules) {
+ const auto &module = i.second.get();
+ std::string svc_str = module->get_uri();
+ if (!svc_str.empty()) {
+ result[module->get_name()] = svc_str;
+ }
+ }
+
+ return result;
+}
+
+void ActivePyModules::log(const std::string &module_name,
+ int level, const std::string &record)
+{
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr[" << module_name << "] "
+ dout(level) << record << dendl;
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+}
+
+PyObject* ActivePyModules::get_counter_python(
+ const std::string &svc_name,
+ const std::string &svc_id,
+ const std::string &path)
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ PyFormatter f;
+ f.open_array_section(path.c_str());
+
+ auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
+ if (metadata) {
+ Mutex::Locker l2(metadata->lock);
+ if (metadata->perf_counters.instances.count(path)) {
+ auto counter_instance = metadata->perf_counters.instances.at(path);
+ const auto &data = counter_instance.get_data();
+ for (const auto &datapoint : data) {
+ f.open_array_section("datapoint");
+ f.dump_unsigned("t", datapoint.t.sec());
+ f.dump_unsigned("v", datapoint.v);
+ f.close_section();
+
+ }
+ } else {
+ dout(4) << "Missing counter: '" << path << "' ("
+ << svc_name << "." << svc_id << ")" << dendl;
+ dout(20) << "Paths are:" << dendl;
+ for (const auto &i : metadata->perf_counters.instances) {
+ dout(20) << i.first << dendl;
+ }
+ }
+ } else {
+ dout(4) << "No daemon state for "
+ << svc_name << "." << svc_id << ")" << dendl;
+ }
+ f.close_section();
+ return f.get();
+}
+
+PyObject* ActivePyModules::get_perf_schema_python(
+ const std::string svc_type,
+ const std::string &svc_id)
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ DaemonStateCollection daemons;
+
+ if (svc_type == "") {
+ daemons = daemon_state.get_all();
+ } else if (svc_id.empty()) {
+ daemons = daemon_state.get_by_service(svc_type);
+ } else {
+ auto key = DaemonKey(svc_type, svc_id);
+ // so that the below can be a loop in all cases
+ auto got = daemon_state.get(key);
+ if (got != nullptr) {
+ daemons[key] = got;
+ }
+ }
+
+ PyFormatter f;
+ if (!daemons.empty()) {
+ for (auto statepair : daemons) {
+ auto key = statepair.first;
+ auto state = statepair.second;
+
+ std::ostringstream daemon_name;
+ daemon_name << key.first << "." << key.second;
+ f.open_object_section(daemon_name.str().c_str());
+
+ Mutex::Locker l(state->lock);
+ for (auto ctr_inst_iter : state->perf_counters.instances) {
+ const auto &counter_name = ctr_inst_iter.first;
+ f.open_object_section(counter_name.c_str());
+ auto type = state->perf_counters.types[counter_name];
+ f.dump_string("description", type.description);
+ if (!type.nick.empty()) {
+ f.dump_string("nick", type.nick);
+ }
+ f.dump_unsigned("type", type.type);
+ f.dump_unsigned("priority", type.priority);
+ f.close_section();
+ }
+ f.close_section();
+ }
+ } else {
+ dout(4) << __func__ << ": No daemon state found for "
+ << svc_type << "." << svc_id << ")" << dendl;
+ }
+ return f.get();
+}
+
+PyObject *ActivePyModules::get_context()
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ // Construct a capsule containing ceph context.
+ // Not incrementing/decrementing ref count on the context because
+ // it's the global one and it has process lifetime.
+ auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
+ return capsule;
+}
+
+static void delete_osdmap(PyObject *object)
+{
+ OSDMap *osdmap = static_cast<OSDMap*>(PyCapsule_GetPointer(object, nullptr));
+ assert(osdmap);
+ dout(10) << __func__ << " " << osdmap << dendl;
+ delete osdmap;
+}
+
+PyObject *ActivePyModules::get_osdmap()
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ // Construct a capsule containing an OSDMap.
+ OSDMap *newmap = new OSDMap;
+ cluster_state.with_osdmap([&](const OSDMap& o) {
+ newmap->deepish_copy_from(o);
+ });
+ dout(10) << __func__ << " " << newmap << dendl;
+ return PyCapsule_New(newmap, nullptr, &delete_osdmap);
+}
+
+void ActivePyModules::set_health_checks(const std::string& module_name,
+ health_check_map_t&& checks)
+{
+ Mutex::Locker l(lock);
+ auto p = modules.find(module_name);
+ if (p != modules.end()) {
+ p->second->set_health_checks(std::move(checks));
+ }
+}
+
+void ActivePyModules::get_health_checks(health_check_map_t *checks)
+{
+ Mutex::Locker l(lock);
+ for (auto& p : modules) {
+ p.second->get_health_checks(checks);
+ }
+}
+
+void ActivePyModules::set_uri(const std::string& module_name,
+ const std::string &uri)
+{
+ Mutex::Locker l(lock);
+
+ dout(4) << " module " << module_name << " set URI '" << uri << "'" << dendl;
+
+ modules[module_name]->set_uri(uri);
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#pragma once
+
+#include "ActivePyModule.h"
+
+#include "common/Finisher.h"
+#include "common/Mutex.h"
+
+#include "osdc/Objecter.h"
+#include "client/Client.h"
+#include "common/LogClient.h"
+#include "mon/MgrMap.h"
+#include "mon/MonCommand.h"
+
+#include "DaemonState.h"
+#include "ClusterState.h"
+
+class ServeThread;
+class health_check_map_t;
+
+typedef std::map<std::string, std::string> PyModuleConfig;
+
+class ActivePyModules
+{
+
+ std::map<std::string, std::unique_ptr<ActivePyModule>> modules;
+ PyModuleConfig config_cache;
+ DaemonStateIndex &daemon_state;
+ ClusterState &cluster_state;
+ MonClient &monc;
+ LogChannelRef clog;
+ Objecter &objecter;
+ Client &client;
+ Finisher &finisher;
+
+
+ mutable Mutex lock{"ActivePyModules::lock"};
+
+public:
+ ActivePyModules(PyModuleConfig const &config_,
+ DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
+ LogChannelRef clog_, Objecter &objecter_, Client &client_,
+ Finisher &f);
+
+ ~ActivePyModules();
+
+ // FIXME: wrap for send_command?
+ MonClient &get_monc() {return monc;}
+ Objecter &get_objecter() {return objecter;}
+ Client &get_client() {return client;}
+
+ PyObject *get_python(const std::string &what);
+ PyObject *get_server_python(const std::string &hostname);
+ PyObject *list_servers_python();
+ PyObject *get_metadata_python(
+ const std::string &svc_type, const std::string &svc_id);
+ PyObject *get_daemon_status_python(
+ const std::string &svc_type, const std::string &svc_id);
+ PyObject *get_counter_python(
+ const std::string &svc_type,
+ const std::string &svc_id,
+ const std::string &path);
+ PyObject *get_perf_schema_python(
+ const std::string svc_type,
+ const std::string &svc_id);
+ PyObject *get_context();
+ PyObject *get_osdmap();
+
+ bool get_config(const std::string &module_name,
+ const std::string &key, std::string *val) const;
+ PyObject *get_config_prefix(const std::string &module_name,
+ const std::string &prefix) const;
+ void set_config(const std::string &module_name,
+ const std::string &key, const boost::optional<std::string> &val);
+
+ void set_health_checks(const std::string& module_name,
+ health_check_map_t&& checks);
+ void get_health_checks(health_check_map_t *checks);
+
+ void set_uri(const std::string& module_name, const std::string &uri);
+
+ void log(const std::string &module_name,
+ int level, const std::string &record);
+
+ // Python command definitions, including callback
+ std::vector<ModuleCommand> get_py_commands() const;
+
+ // Monitor command definitions, suitable for CLI
+ std::vector<MonCommand> get_commands() const;
+
+ std::map<std::string, std::string> get_services() const;
+
+ // Public so that MonCommandCompletion can use it
+ // FIXME: for send_command completion notifications,
+ // send it to only the module that sent the command, not everyone
+ void notify_all(const std::string ¬ify_type,
+ const std::string ¬ify_id);
+ void notify_all(const LogEntry &log_entry);
+
+ int init();
+ void shutdown();
+
+ void start_one(std::string const &module_name,
+ PyObject *pClass,
+ PyThreadState *pMyThreadState);
+
+ void dump_server(const std::string &hostname,
+ const DaemonStateCollection &dmc,
+ Formatter *f);
+};
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+/**
+ * The interface we present to python code that runs within
+ * ceph-mgr. This is implemented as a Python class from which
+ * all modules must inherit -- access to the Ceph state is then
+ * available as methods on that object.
+ */
+
+#include "Mgr.h"
+
+#include "mon/MonClient.h"
+#include "common/errno.h"
+#include "common/version.h"
+
+#include "BaseMgrModule.h"
+#include "Gil.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+
+#define PLACEHOLDER ""
+
+
+typedef struct {
+ PyObject_HEAD
+ ActivePyModules *py_modules;
+ ActivePyModule *this_module;
+} BaseMgrModule;
+
+class MonCommandCompletion : public Context
+{
+ ActivePyModules *py_modules;
+ PyObject *python_completion;
+ const std::string tag;
+ SafeThreadState pThreadState;
+
+public:
+ std::string outs;
+ bufferlist outbl;
+
+ MonCommandCompletion(
+ ActivePyModules *py_modules_, PyObject* ev,
+ const std::string &tag_, PyThreadState *ts_)
+ : py_modules(py_modules_), python_completion(ev),
+ tag(tag_), pThreadState(ts_)
+ {
+ assert(python_completion != nullptr);
+ Py_INCREF(python_completion);
+ }
+
+ ~MonCommandCompletion() override
+ {
+ Py_DECREF(python_completion);
+ }
+
+ void finish(int r) override
+ {
+ dout(10) << "MonCommandCompletion::finish()" << dendl;
+ {
+ // Scoped so the Gil is released before calling notify_all()
+ // Create new thread state because this is called via the MonClient
+ // Finisher, not the PyModules finisher.
+ Gil gil(pThreadState, true);
+
+ auto set_fn = PyObject_GetAttrString(python_completion, "complete");
+ assert(set_fn != nullptr);
+
+ auto pyR = PyInt_FromLong(r);
+ auto pyOutBl = PyString_FromString(outbl.to_str().c_str());
+ auto pyOutS = PyString_FromString(outs.c_str());
+ auto args = PyTuple_Pack(3, pyR, pyOutBl, pyOutS);
+ Py_DECREF(pyR);
+ Py_DECREF(pyOutBl);
+ Py_DECREF(pyOutS);
+
+ auto rtn = PyObject_CallObject(set_fn, args);
+ if (rtn != nullptr) {
+ Py_DECREF(rtn);
+ }
+ Py_DECREF(args);
+ }
+ py_modules->notify_all("command", tag);
+ }
+};
+
+
+static PyObject*
+ceph_send_command(BaseMgrModule *self, PyObject *args)
+{
+ // Like mon, osd, mds
+ char *type = nullptr;
+
+ // Like "23" for an OSD or "myid" for an MDS
+ char *name = nullptr;
+
+ char *cmd_json = nullptr;
+ char *tag = nullptr;
+ PyObject *completion = nullptr;
+ if (!PyArg_ParseTuple(args, "Ossss:ceph_send_command",
+ &completion, &type, &name, &cmd_json, &tag)) {
+ return nullptr;
+ }
+
+ auto set_fn = PyObject_GetAttrString(completion, "complete");
+ if (set_fn == nullptr) {
+ ceph_abort(); // TODO raise python exception instead
+ } else {
+ assert(PyCallable_Check(set_fn));
+ }
+ Py_DECREF(set_fn);
+
+ auto c = new MonCommandCompletion(self->py_modules,
+ completion, tag, PyThreadState_Get());
+ if (std::string(type) == "mon") {
+ self->py_modules->get_monc().start_mon_command(
+ {cmd_json},
+ {},
+ &c->outbl,
+ &c->outs,
+ c);
+ } else if (std::string(type) == "osd") {
+ std::string err;
+ uint64_t osd_id = strict_strtoll(name, 10, &err);
+ if (!err.empty()) {
+ delete c;
+ string msg("invalid osd_id: ");
+ msg.append("\"").append(name).append("\"");
+ PyErr_SetString(PyExc_ValueError, msg.c_str());
+ return nullptr;
+ }
+
+ ceph_tid_t tid;
+ self->py_modules->get_objecter().osd_command(
+ osd_id,
+ {cmd_json},
+ {},
+ &tid,
+ &c->outbl,
+ &c->outs,
+ c);
+ } else if (std::string(type) == "mds") {
+ int r = self->py_modules->get_client().mds_command(
+ name,
+ {cmd_json},
+ {},
+ &c->outbl,
+ &c->outs,
+ c);
+ if (r != 0) {
+ string msg("failed to send command to mds: ");
+ msg.append(cpp_strerror(r));
+ PyErr_SetString(PyExc_RuntimeError, msg.c_str());
+ return nullptr;
+ }
+ } else if (std::string(type) == "pg") {
+ pg_t pgid;
+ if (!pgid.parse(name)) {
+ delete c;
+ string msg("invalid pgid: ");
+ msg.append("\"").append(name).append("\"");
+ PyErr_SetString(PyExc_ValueError, msg.c_str());
+ return nullptr;
+ }
+
+ ceph_tid_t tid;
+ self->py_modules->get_objecter().pg_command(
+ pgid,
+ {cmd_json},
+ {},
+ &tid,
+ &c->outbl,
+ &c->outs,
+ c);
+ return nullptr;
+ } else {
+ delete c;
+ string msg("unknown service type: ");
+ msg.append(type);
+ PyErr_SetString(PyExc_ValueError, msg.c_str());
+ return nullptr;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject*
+ceph_set_health_checks(BaseMgrModule *self, PyObject *args)
+{
+ PyObject *checks = NULL;
+ if (!PyArg_ParseTuple(args, "O:ceph_set_health_checks", &checks)) {
+ return NULL;
+ }
+ if (!PyDict_Check(checks)) {
+ derr << __func__ << " arg not a dict" << dendl;
+ Py_RETURN_NONE;
+ }
+ PyObject *checksls = PyDict_Items(checks);
+ health_check_map_t out_checks;
+ for (int i = 0; i < PyList_Size(checksls); ++i) {
+ PyObject *kv = PyList_GET_ITEM(checksls, i);
+ char *check_name = nullptr;
+ PyObject *check_info = nullptr;
+ if (!PyArg_ParseTuple(kv, "sO:pair", &check_name, &check_info)) {
+ derr << __func__ << " dict item " << i
+ << " not a size 2 tuple" << dendl;
+ continue;
+ }
+ if (!PyDict_Check(check_info)) {
+ derr << __func__ << " item " << i << " " << check_name
+ << " value not a dict" << dendl;
+ continue;
+ }
+ health_status_t severity = HEALTH_OK;
+ string summary;
+ list<string> detail;
+ PyObject *infols = PyDict_Items(check_info);
+ for (int j = 0; j < PyList_Size(infols); ++j) {
+ PyObject *pair = PyList_GET_ITEM(infols, j);
+ if (!PyTuple_Check(pair)) {
+ derr << __func__ << " item " << i << " pair " << j
+ << " not a tuple" << dendl;
+ continue;
+ }
+ char *k = nullptr;
+ PyObject *v = nullptr;
+ if (!PyArg_ParseTuple(pair, "sO:pair", &k, &v)) {
+ derr << __func__ << " item " << i << " pair " << j
+ << " not a size 2 tuple" << dendl;
+ continue;
+ }
+ string ks(k);
+ if (ks == "severity") {
+ if (!PyString_Check(v)) {
+ derr << __func__ << " check " << check_name
+ << " severity value not string" << dendl;
+ continue;
+ }
+ string vs(PyString_AsString(v));
+ if (vs == "warning") {
+ severity = HEALTH_WARN;
+ } else if (vs == "error") {
+ severity = HEALTH_ERR;
+ }
+ } else if (ks == "summary") {
+ if (!PyString_Check(v)) {
+ derr << __func__ << " check " << check_name
+ << " summary value not string" << dendl;
+ continue;
+ }
+ summary = PyString_AsString(v);
+ } else if (ks == "detail") {
+ if (!PyList_Check(v)) {
+ derr << __func__ << " check " << check_name
+ << " detail value not list" << dendl;
+ continue;
+ }
+ for (int k = 0; k < PyList_Size(v); ++k) {
+ PyObject *di = PyList_GET_ITEM(v, k);
+ if (!PyString_Check(di)) {
+ derr << __func__ << " check " << check_name
+ << " detail item " << k << " not a string" << dendl;
+ continue;
+ }
+ detail.push_back(PyString_AsString(di));
+ }
+ } else {
+ derr << __func__ << " check " << check_name
+ << " unexpected key " << k << dendl;
+ }
+ }
+ auto& d = out_checks.add(check_name, severity, summary);
+ d.detail.swap(detail);
+ }
+
+ JSONFormatter jf(true);
+ dout(10) << "module " << self->this_module->get_name()
+ << " health checks:\n";
+ out_checks.dump(&jf);
+ jf.flush(*_dout);
+ *_dout << dendl;
+
+ self->py_modules->set_health_checks(self->this_module->get_name(),
+ std::move(out_checks));
+
+ Py_RETURN_NONE;
+}
+
+
+static PyObject*
+ceph_state_get(BaseMgrModule *self, PyObject *args)
+{
+ char *what = NULL;
+ if (!PyArg_ParseTuple(args, "s:ceph_state_get", &what)) {
+ return NULL;
+ }
+
+ return self->py_modules->get_python(what);
+}
+
+
+static PyObject*
+ceph_get_server(BaseMgrModule *self, PyObject *args)
+{
+ char *hostname = NULL;
+ if (!PyArg_ParseTuple(args, "z:ceph_get_server", &hostname)) {
+ return NULL;
+ }
+
+ if (hostname) {
+ return self->py_modules->get_server_python(hostname);
+ } else {
+ return self->py_modules->list_servers_python();
+ }
+}
+
+static PyObject*
+ceph_get_mgr_id(BaseMgrModule *self, PyObject *args)
+{
+ return PyString_FromString(g_conf->name.get_id().c_str());
+}
+
+static PyObject*
+ceph_config_get(BaseMgrModule *self, PyObject *args)
+{
+ char *what = nullptr;
+ if (!PyArg_ParseTuple(args, "s:ceph_config_get", &what)) {
+ derr << "Invalid args!" << dendl;
+ return nullptr;
+ }
+
+ std::string value;
+ bool found = self->py_modules->get_config(self->this_module->get_name(),
+ what, &value);
+ if (found) {
+ dout(10) << "ceph_config_get " << what << " found: " << value.c_str() << dendl;
+ return PyString_FromString(value.c_str());
+ } else {
+ dout(4) << "ceph_config_get " << what << " not found " << dendl;
+ Py_RETURN_NONE;
+ }
+}
+
+static PyObject*
+ceph_config_get_prefix(BaseMgrModule *self, PyObject *args)
+{
+ char *prefix = nullptr;
+ if (!PyArg_ParseTuple(args, "s:ceph_config_get", &prefix)) {
+ derr << "Invalid args!" << dendl;
+ return nullptr;
+ }
+
+ return self->py_modules->get_config_prefix(self->this_module->get_name(),
+ prefix);
+}
+
+static PyObject*
+ceph_config_set(BaseMgrModule *self, PyObject *args)
+{
+ char *key = nullptr;
+ char *value = nullptr;
+ if (!PyArg_ParseTuple(args, "sz:ceph_config_set", &key, &value)) {
+ return nullptr;
+ }
+ boost::optional<string> val;
+ if (value) {
+ val = value;
+ }
+ self->py_modules->set_config(self->this_module->get_name(), key, val);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject*
+get_metadata(BaseMgrModule *self, PyObject *args)
+{
+ char *svc_name = NULL;
+ char *svc_id = NULL;
+ if (!PyArg_ParseTuple(args, "ss:get_metadata", &svc_name, &svc_id)) {
+ return nullptr;
+ }
+ return self->py_modules->get_metadata_python(svc_name, svc_id);
+}
+
+static PyObject*
+get_daemon_status(BaseMgrModule *self, PyObject *args)
+{
+ char *svc_name = NULL;
+ char *svc_id = NULL;
+ if (!PyArg_ParseTuple(args, "ss:get_daemon_status", &svc_name,
+ &svc_id)) {
+ return nullptr;
+ }
+ return self->py_modules->get_daemon_status_python(svc_name, svc_id);
+}
+
+static PyObject*
+ceph_log(BaseMgrModule *self, PyObject *args)
+{
+ int level = 0;
+ char *record = nullptr;
+ if (!PyArg_ParseTuple(args, "is:log", &level, &record)) {
+ return nullptr;
+ }
+
+ assert(self->this_module);
+
+ self->py_modules->log(self->this_module->get_name(), level, record);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+ceph_get_version(BaseMgrModule *self, PyObject *args)
+{
+ return PyString_FromString(pretty_version_to_str().c_str());
+}
+
+static PyObject *
+ceph_get_context(BaseMgrModule *self, PyObject *args)
+{
+ return self->py_modules->get_context();
+}
+
+static PyObject*
+get_counter(BaseMgrModule *self, PyObject *args)
+{
+ char *svc_name = nullptr;
+ char *svc_id = nullptr;
+ char *counter_path = nullptr;
+ if (!PyArg_ParseTuple(args, "sss:get_counter", &svc_name,
+ &svc_id, &counter_path)) {
+ return nullptr;
+ }
+ return self->py_modules->get_counter_python(
+ svc_name, svc_id, counter_path);
+}
+
+static PyObject*
+get_perf_schema(BaseMgrModule *self, PyObject *args)
+{
+ char *type_str = nullptr;
+ char *svc_id = nullptr;
+ if (!PyArg_ParseTuple(args, "ss:get_perf_schema", &type_str,
+ &svc_id)) {
+ return nullptr;
+ }
+
+ return self->py_modules->get_perf_schema_python(type_str, svc_id);
+}
+
+static PyObject *
+ceph_get_osdmap(BaseMgrModule *self, PyObject *args)
+{
+ return self->py_modules->get_osdmap();
+}
+
+static PyObject*
+ceph_set_uri(BaseMgrModule *self, PyObject *args)
+{
+ char *svc_str = nullptr;
+ if (!PyArg_ParseTuple(args, "s:ceph_advertize_service",
+ &svc_str)) {
+ return nullptr;
+ }
+
+ // We call down into PyModules even though we have a MgrPyModule
+ // reference here, because MgrPyModule's fields are protected
+ // by PyModules' lock.
+ self->py_modules->set_uri(self->this_module->get_name(), svc_str);
+
+ Py_RETURN_NONE;
+}
+
+
+PyMethodDef BaseMgrModule_methods[] = {
+ {"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS,
+ "Get a cluster object"},
+
+ {"_ceph_get_server", (PyCFunction)ceph_get_server, METH_VARARGS,
+ "Get a server object"},
+
+ {"_ceph_get_metadata", (PyCFunction)get_metadata, METH_VARARGS,
+ "Get a service's metadata"},
+
+ {"_ceph_get_daemon_status", (PyCFunction)get_daemon_status, METH_VARARGS,
+ "Get a service's status"},
+
+ {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS,
+ "Send a mon command"},
+
+ {"_ceph_set_health_checks", (PyCFunction)ceph_set_health_checks, METH_VARARGS,
+ "Set health checks for this module"},
+
+ {"_ceph_get_mgr_id", (PyCFunction)ceph_get_mgr_id, METH_NOARGS,
+ "Get the name of the Mgr daemon where we are running"},
+
+ {"_ceph_get_config", (PyCFunction)ceph_config_get, METH_VARARGS,
+ "Get a configuration value"},
+
+ {"_ceph_get_config_prefix", (PyCFunction)ceph_config_get_prefix, METH_VARARGS,
+ "Get all configuration values with a given prefix"},
+
+ {"_ceph_set_config", (PyCFunction)ceph_config_set, METH_VARARGS,
+ "Set a configuration value"},
+
+ {"_ceph_get_counter", (PyCFunction)get_counter, METH_VARARGS,
+ "Get a performance counter"},
+
+ {"_ceph_get_perf_schema", (PyCFunction)get_perf_schema, METH_VARARGS,
+ "Get the performance counter schema"},
+
+ {"_ceph_log", (PyCFunction)ceph_log, METH_VARARGS,
+ "Emit a (local) log message"},
+
+ {"_ceph_get_version", (PyCFunction)ceph_get_version, METH_VARARGS,
+ "Get the ceph version of this process"},
+
+ {"_ceph_get_context", (PyCFunction)ceph_get_context, METH_NOARGS,
+ "Get a CephContext* in a python capsule"},
+
+ {"_ceph_get_osdmap", (PyCFunction)ceph_get_osdmap, METH_NOARGS,
+ "Get an OSDMap* in a python capsule"},
+
+ {"_ceph_set_uri", (PyCFunction)ceph_set_uri, METH_VARARGS,
+ "Advertize a service URI served by this module"},
+
+ {NULL, NULL, 0, NULL}
+};
+
+
+static PyObject *
+BaseMgrModule_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ BaseMgrModule *self;
+
+ self = (BaseMgrModule *)type->tp_alloc(type, 0);
+
+ return (PyObject *)self;
+}
+
+static int
+BaseMgrModule_init(BaseMgrModule *self, PyObject *args, PyObject *kwds)
+{
+ PyObject *py_modules_capsule = nullptr;
+ PyObject *this_module_capsule = nullptr;
+ static const char *kwlist[] = {"py_modules", "this_module", NULL};
+
+ if (! PyArg_ParseTupleAndKeywords(args, kwds, "OO",
+ const_cast<char**>(kwlist),
+ &py_modules_capsule,
+ &this_module_capsule)) {
+ return -1;
+ }
+
+ self->py_modules = (ActivePyModules*)PyCapsule_GetPointer(
+ py_modules_capsule, nullptr);
+ assert(self->py_modules);
+ self->this_module = (ActivePyModule*)PyCapsule_GetPointer(
+ this_module_capsule, nullptr);
+ assert(self->this_module);
+
+ return 0;
+}
+
+PyTypeObject BaseMgrModuleType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "ceph_module.BaseMgrModule", /* tp_name */
+ sizeof(BaseMgrModule), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ 0, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ "ceph-mgr Python Plugin", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ BaseMgrModule_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc)BaseMgrModule_init, /* tp_init */
+ 0, /* tp_alloc */
+ BaseMgrModule_new, /* tp_new */
+};
+
--- /dev/null
+#ifndef PYSTATE_H_
+#define PYSTATE_H_
+
+#include "Python.h"
+
+extern PyTypeObject BaseMgrModuleType;
+
+#endif
+
#include "mgr/MgrContext.h"
#include "mgr/mgr_commands.h"
-#include "MgrPyModule.h"
+//#include "MgrPyModule.h"
#include "DaemonServer.h"
#include "messages/MMgrBeacon.h"
#include "messages/MMgrDigest.h"
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 John Spray <john.spray@redhat.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-#include "PyState.h"
-#include "PyOSDMap.h"
-
-#include "PyFormatter.h"
-
-#include "common/debug.h"
-
-#include "MgrPyModule.h"
-
-//XXX courtesy of http://stackoverflow.com/questions/1418015/how-to-get-python-exception-text
-#include <boost/python.hpp>
-#include "include/assert.h" // boost clobbers this
-
-
-#define dout_prefix *_dout << "mgr " << __func__ << " "
-
-// decode a Python exception into a string
-std::string handle_pyerror()
-{
- using namespace boost::python;
- using namespace boost;
-
- PyObject *exc, *val, *tb;
- object formatted_list, formatted;
- PyErr_Fetch(&exc, &val, &tb);
- handle<> hexc(exc), hval(allow_null(val)), htb(allow_null(tb));
- object traceback(import("traceback"));
- if (!tb) {
- object format_exception_only(traceback.attr("format_exception_only"));
- formatted_list = format_exception_only(hexc, hval);
- } else {
- object format_exception(traceback.attr("format_exception"));
- formatted_list = format_exception(hexc,hval, htb);
- }
- formatted = str("").join(formatted_list);
- return extract<std::string>(formatted);
-}
-
-
-
-
-void *ServeThread::entry()
-{
- // No need to acquire the GIL here; the module does it.
- dout(4) << "Entering thread for " << mod->get_name() << dendl;
- mod->serve();
- return nullptr;
-}
-
-ActivePyModule::ActivePyModule(const std::string &module_name_,
- PyObject *pClass_,
- PyThreadState *my_ts_)
- : module_name(module_name_),
- pClass(pClass_),
- pMyThreadState(my_ts_),
- thread(this)
-{
-}
-
-ActivePyModule::~ActivePyModule()
-{
- if (pMyThreadState.ts != nullptr) {
- Gil gil(pMyThreadState);
-
- Py_XDECREF(pClassInstance);
-
- //
- // Ideally, now, we'd be able to do this:
- //
- // Py_EndInterpreter(pMyThreadState);
- // PyThreadState_Swap(pMainThreadState);
- //
- // Unfortunately, if the module has any other *python* threads active
- // at this point, Py_EndInterpreter() will abort with:
- //
- // Fatal Python error: Py_EndInterpreter: not the last thread
- //
- // This can happen when using CherryPy in a module, becuase CherryPy
- // runs an extra thread as a timeout monitor, which spends most of its
- // life inside a time.sleep(60). Unless you are very, very lucky with
- // the timing calling this destructor, that thread will still be stuck
- // in a sleep, and Py_EndInterpreter() will abort.
- //
- // This could of course also happen with a poorly written module which
- // made no attempt to clean up any additional threads it created.
- //
- // The safest thing to do is just not call Py_EndInterpreter(), and
- // let Py_Finalize() kill everything after all modules are shut down.
- //
- }
-}
-
-int ActivePyModule::load(ActivePyModules *py_modules)
-{
- Gil gil(pMyThreadState);
-
- // We tell the module how we name it, so that it can be consistent
- // with us in logging etc.
- auto pThisPtr = PyCapsule_New(this, nullptr, nullptr);
- auto pPyModules = PyCapsule_New(py_modules, nullptr, nullptr);
- auto pModuleName = PyString_FromString(module_name.c_str());
- auto pArgs = PyTuple_Pack(3, pModuleName, pPyModules, pThisPtr);
-
- pClassInstance = PyObject_CallObject(pClass, pArgs);
- Py_DECREF(pClass);
- Py_DECREF(pModuleName);
- Py_DECREF(pArgs);
- if (pClassInstance == nullptr) {
- derr << "Failed to construct class in '" << module_name << "'" << dendl;
- derr << handle_pyerror() << dendl;
- return -EINVAL;
- } else {
- dout(1) << "Constructed class from module: " << module_name << dendl;
- }
-
- return load_commands();
-}
-
-int ActivePyModule::serve()
-{
- assert(pClassInstance != nullptr);
-
- // This method is called from a separate OS thread (i.e. a thread not
- // created by Python), so tell Gil to wrap this in a new thread state.
- Gil gil(pMyThreadState, true);
-
- auto pValue = PyObject_CallMethod(pClassInstance,
- const_cast<char*>("serve"), nullptr);
-
- int r = 0;
- if (pValue != NULL) {
- Py_DECREF(pValue);
- } else {
- derr << module_name << ".serve:" << dendl;
- derr << handle_pyerror() << dendl;
- return -EINVAL;
- }
-
- return r;
-}
-
-void ActivePyModule::shutdown()
-{
- assert(pClassInstance != nullptr);
-
- Gil gil(pMyThreadState);
-
- auto pValue = PyObject_CallMethod(pClassInstance,
- const_cast<char*>("shutdown"), nullptr);
-
- if (pValue != NULL) {
- Py_DECREF(pValue);
- } else {
- derr << "Failed to invoke shutdown() on " << module_name << dendl;
- derr << handle_pyerror() << dendl;
- }
-}
-
-void ActivePyModule::notify(const std::string ¬ify_type, const std::string ¬ify_id)
-{
- assert(pClassInstance != nullptr);
-
- Gil gil(pMyThreadState);
-
- // Execute
- auto pValue = PyObject_CallMethod(pClassInstance,
- const_cast<char*>("notify"), const_cast<char*>("(ss)"),
- notify_type.c_str(), notify_id.c_str());
-
- if (pValue != NULL) {
- Py_DECREF(pValue);
- } else {
- derr << module_name << ".notify:" << dendl;
- derr << handle_pyerror() << dendl;
- // FIXME: callers can't be expected to handle a python module
- // that has spontaneously broken, but Mgr() should provide
- // a hook to unload misbehaving modules when they have an
- // error somewhere like this
- }
-}
-
-void ActivePyModule::notify_clog(const LogEntry &log_entry)
-{
- assert(pClassInstance != nullptr);
-
- Gil gil(pMyThreadState);
-
- // Construct python-ized LogEntry
- PyFormatter f;
- log_entry.dump(&f);
- auto py_log_entry = f.get();
-
- // Execute
- auto pValue = PyObject_CallMethod(pClassInstance,
- const_cast<char*>("notify"), const_cast<char*>("(sN)"),
- "clog", py_log_entry);
-
- if (pValue != NULL) {
- Py_DECREF(pValue);
- } else {
- derr << module_name << ".notify_clog:" << dendl;
- derr << handle_pyerror() << dendl;
- // FIXME: callers can't be expected to handle a python module
- // that has spontaneously broken, but Mgr() should provide
- // a hook to unload misbehaving modules when they have an
- // error somewhere like this
- }
-}
-
-int ActivePyModule::load_commands()
-{
- // Don't need a Gil here -- this is called from ActivePyModule::load(),
- // which already has one.
- PyObject *command_list = PyObject_GetAttrString(pClassInstance, "COMMANDS");
- assert(command_list != nullptr);
- const size_t list_size = PyList_Size(command_list);
- for (size_t i = 0; i < list_size; ++i) {
- PyObject *command = PyList_GetItem(command_list, i);
- assert(command != nullptr);
-
- ModuleCommand item;
-
- PyObject *pCmd = PyDict_GetItemString(command, "cmd");
- assert(pCmd != nullptr);
- item.cmdstring = PyString_AsString(pCmd);
-
- dout(20) << "loaded command " << item.cmdstring << dendl;
-
- PyObject *pDesc = PyDict_GetItemString(command, "desc");
- assert(pDesc != nullptr);
- item.helpstring = PyString_AsString(pDesc);
-
- PyObject *pPerm = PyDict_GetItemString(command, "perm");
- assert(pPerm != nullptr);
- item.perm = PyString_AsString(pPerm);
-
- item.handler = this;
-
- commands.push_back(item);
- }
- Py_DECREF(command_list);
-
- dout(10) << "loaded " << commands.size() << " commands" << dendl;
-
- return 0;
-}
-
-int ActivePyModule::handle_command(
- const cmdmap_t &cmdmap,
- std::stringstream *ds,
- std::stringstream *ss)
-{
- assert(ss != nullptr);
- assert(ds != nullptr);
-
- Gil gil(pMyThreadState);
-
- PyFormatter f;
- cmdmap_dump(cmdmap, &f);
- PyObject *py_cmd = f.get();
-
- auto pResult = PyObject_CallMethod(pClassInstance,
- const_cast<char*>("handle_command"), const_cast<char*>("(O)"), py_cmd);
-
- Py_DECREF(py_cmd);
-
- int r = 0;
- if (pResult != NULL) {
- if (PyTuple_Size(pResult) != 3) {
- r = -EINVAL;
- } else {
- r = PyInt_AsLong(PyTuple_GetItem(pResult, 0));
- *ds << PyString_AsString(PyTuple_GetItem(pResult, 1));
- *ss << PyString_AsString(PyTuple_GetItem(pResult, 2));
- }
-
- Py_DECREF(pResult);
- } else {
- *ds << "";
- *ss << handle_pyerror();
- r = -EINVAL;
- }
-
- return r;
-}
-
-void ActivePyModule::get_health_checks(health_check_map_t *checks)
-{
- checks->merge(health_checks);
-}
+++ /dev/null
-
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 John Spray <john.spray@redhat.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-
-#ifndef MGR_PY_MODULE_H_
-#define MGR_PY_MODULE_H_
-
-// Python.h comes first because otherwise it clobbers ceph's assert
-#include "Python.h"
-
-#include "common/cmdparse.h"
-#include "common/LogEntry.h"
-#include "common/Mutex.h"
-#include "common/Thread.h"
-#include "mon/health_check.h"
-#include "mgr/Gil.h"
-
-#include <vector>
-#include <string>
-
-
-class ActivePyModule;
-class ActivePyModules;
-
-/**
- * A Ceph CLI command description provided from a Python module
- */
-class ModuleCommand {
-public:
- std::string cmdstring;
- std::string helpstring;
- std::string perm;
- ActivePyModule *handler;
-};
-
-class ServeThread : public Thread
-{
- ActivePyModule *mod;
-
-public:
- ServeThread(ActivePyModule *mod_)
- : mod(mod_) {}
-
- void *entry() override;
-};
-
-class ActivePyModule
-{
-private:
- const std::string module_name;
-
- // Passed in by whoever loaded our python module and looked up
- // the symbols in it.
- PyObject *pClass = nullptr;
-
- // Passed in by whoever created our subinterpreter for us
- SafeThreadState pMyThreadState = nullptr;
-
- // Populated when we construct our instance of pClass in load()
- PyObject *pClassInstance = nullptr;
-
- health_check_map_t health_checks;
-
- std::vector<ModuleCommand> commands;
-
- int load_commands();
-
- // Optional, URI exposed by plugins that implement serve()
- std::string uri;
-
-public:
- ActivePyModule(const std::string &module_name,
- PyObject *pClass_,
- PyThreadState *my_ts);
- ~ActivePyModule();
-
- ServeThread thread;
-
- int load(ActivePyModules *py_modules);
- int serve();
- void shutdown();
- void notify(const std::string ¬ify_type, const std::string ¬ify_id);
- void notify_clog(const LogEntry &le);
-
- const std::vector<ModuleCommand> &get_commands() const
- {
- return commands;
- }
-
- const std::string &get_name() const
- {
- return module_name;
- }
-
- int handle_command(
- const cmdmap_t &cmdmap,
- std::stringstream *ds,
- std::stringstream *ss);
-
- void set_health_checks(health_check_map_t&& c) {
- health_checks = std::move(c);
- }
- void get_health_checks(health_check_map_t *checks);
-
- void set_uri(const std::string &str)
- {
- uri = str;
- }
-
- std::string get_uri() const
- {
- return uri;
- }
-};
-
-std::string handle_pyerror();
-
-#endif
-
#include "include/stringify.h"
#include "common/errno.h"
-#include "PyState.h"
+#include "BaseMgrModule.h"
+#include "PyOSDMap.h"
#include "Gil.h"
+#include "ActivePyModules.h"
+
#include "PyModuleRegistry.h"
// definition for non-const static member
// Python.h comes first because otherwise it clobbers ceph's assert
#include "Python.h"
-// ActivePyModules
-#include "PyModules.h"
+#include <string>
+#include <map>
+#include <memory>
+
+#include "common/LogClient.h"
+
+#include "ActivePyModules.h"
class PyModule
{
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 John Spray <john.spray@inktank.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-// Include this first to get python headers earlier
-#include "PyState.h"
-#include "Gil.h"
-
-#include "common/errno.h"
-#include "include/stringify.h"
-
-#include "PyFormatter.h"
-
-#include "osd/OSDMap.h"
-#include "mon/MonMap.h"
-
-#include "mgr/MgrContext.h"
-
-// For ::config_prefix
-#include "PyModuleRegistry.h"
-
-#include "PyModules.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_mgr
-#undef dout_prefix
-#define dout_prefix *_dout << "mgr " << __func__ << " "
-
-// constructor/destructor implementations cannot be in .h,
-// because ServeThread is still an "incomplete" type there
-
-ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
- DaemonStateIndex &ds, ClusterState &cs,
- MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
- Client &client_, Finisher &f)
- : config_cache(config_), daemon_state(ds), cluster_state(cs),
- monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
- lock("ActivePyModules")
-{}
-
-ActivePyModules::~ActivePyModules() = default;
-
-void ActivePyModules::dump_server(const std::string &hostname,
- const DaemonStateCollection &dmc,
- Formatter *f)
-{
- f->dump_string("hostname", hostname);
- f->open_array_section("services");
- std::string ceph_version;
-
- for (const auto &i : dmc) {
- Mutex::Locker l(i.second->lock);
- const auto &key = i.first;
- const std::string &str_type = key.first;
- const std::string &svc_name = key.second;
-
- // TODO: pick the highest version, and make sure that
- // somewhere else (during health reporting?) we are
- // indicating to the user if we see mixed versions
- auto ver_iter = i.second->metadata.find("ceph_version");
- if (ver_iter != i.second->metadata.end()) {
- ceph_version = i.second->metadata.at("ceph_version");
- }
-
- f->open_object_section("service");
- f->dump_string("type", str_type);
- f->dump_string("id", svc_name);
- f->close_section();
- }
- f->close_section();
-
- f->dump_string("ceph_version", ceph_version);
-}
-
-
-
-PyObject *ActivePyModules::get_server_python(const std::string &hostname)
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
- dout(10) << " (" << hostname << ")" << dendl;
-
- auto dmc = daemon_state.get_by_server(hostname);
-
- PyFormatter f;
- dump_server(hostname, dmc, &f);
- return f.get();
-}
-
-
-PyObject *ActivePyModules::list_servers_python()
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
- dout(10) << " >" << dendl;
-
- PyFormatter f(false, true);
- daemon_state.with_daemons_by_server([this, &f]
- (const std::map<std::string, DaemonStateCollection> &all) {
- for (const auto &i : all) {
- const auto &hostname = i.first;
-
- f.open_object_section("server");
- dump_server(hostname, i.second, &f);
- f.close_section();
- }
- });
-
- return f.get();
-}
-
-PyObject *ActivePyModules::get_metadata_python(
- const std::string &svc_type,
- const std::string &svc_id)
-{
- auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
- if (metadata == nullptr) {
- derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
- Py_RETURN_NONE;
- }
-
- Mutex::Locker l(metadata->lock);
- PyFormatter f;
- f.dump_string("hostname", metadata->hostname);
- for (const auto &i : metadata->metadata) {
- f.dump_string(i.first.c_str(), i.second);
- }
-
- return f.get();
-}
-
-PyObject *ActivePyModules::get_daemon_status_python(
- const std::string &svc_type,
- const std::string &svc_id)
-{
- auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
- if (metadata == nullptr) {
- derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
- Py_RETURN_NONE;
- }
-
- Mutex::Locker l(metadata->lock);
- PyFormatter f;
- for (const auto &i : metadata->service_status) {
- f.dump_string(i.first.c_str(), i.second);
- }
- return f.get();
-}
-
-PyObject *ActivePyModules::get_python(const std::string &what)
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
-
- if (what == "fs_map") {
- PyFormatter f;
- cluster_state.with_fsmap([&f](const FSMap &fsmap) {
- fsmap.dump(&f);
- });
- return f.get();
- } else if (what == "osdmap_crush_map_text") {
- bufferlist rdata;
- cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
- osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
- });
- std::string crush_text = rdata.to_str();
- return PyString_FromString(crush_text.c_str());
- } else if (what.substr(0, 7) == "osd_map") {
- PyFormatter f;
- cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
- if (what == "osd_map") {
- osd_map.dump(&f);
- } else if (what == "osd_map_tree") {
- osd_map.print_tree(&f, nullptr);
- } else if (what == "osd_map_crush") {
- osd_map.crush->dump(&f);
- }
- });
- return f.get();
- } else if (what == "config") {
- PyFormatter f;
- g_conf->show_config(&f);
- return f.get();
- } else if (what == "mon_map") {
- PyFormatter f;
- cluster_state.with_monmap(
- [&f](const MonMap &monmap) {
- monmap.dump(&f);
- }
- );
- return f.get();
- } else if (what == "service_map") {
- PyFormatter f;
- cluster_state.with_servicemap(
- [&f](const ServiceMap &service_map) {
- service_map.dump(&f);
- }
- );
- return f.get();
- } else if (what == "osd_metadata") {
- PyFormatter f;
- auto dmc = daemon_state.get_by_service("osd");
- for (const auto &i : dmc) {
- Mutex::Locker l(i.second->lock);
- f.open_object_section(i.first.second.c_str());
- f.dump_string("hostname", i.second->hostname);
- for (const auto &j : i.second->metadata) {
- f.dump_string(j.first.c_str(), j.second);
- }
- f.close_section();
- }
- return f.get();
- } else if (what == "pg_summary") {
- PyFormatter f;
- cluster_state.with_pgmap(
- [&f](const PGMap &pg_map) {
- std::map<std::string, std::map<std::string, uint32_t> > osds;
- std::map<std::string, std::map<std::string, uint32_t> > pools;
- std::map<std::string, uint32_t> all;
- for (const auto &i : pg_map.pg_stat) {
- const auto pool = i.first.m_pool;
- const std::string state = pg_state_string(i.second.state);
- // Insert to per-pool map
- pools[stringify(pool)][state]++;
- for (const auto &osd_id : i.second.acting) {
- osds[stringify(osd_id)][state]++;
- }
- all[state]++;
- }
- f.open_object_section("by_osd");
- for (const auto &i : osds) {
- f.open_object_section(i.first.c_str());
- for (const auto &j : i.second) {
- f.dump_int(j.first.c_str(), j.second);
- }
- f.close_section();
- }
- f.close_section();
- f.open_object_section("by_pool");
- for (const auto &i : pools) {
- f.open_object_section(i.first.c_str());
- for (const auto &j : i.second) {
- f.dump_int(j.first.c_str(), j.second);
- }
- f.close_section();
- }
- f.close_section();
- f.open_object_section("all");
- for (const auto &i : all) {
- f.dump_int(i.first.c_str(), i.second);
- }
- f.close_section();
- }
- );
- return f.get();
- } else if (what == "pg_status") {
- PyFormatter f;
- cluster_state.with_pgmap(
- [&f](const PGMap &pg_map) {
- pg_map.print_summary(&f, nullptr);
- }
- );
- return f.get();
- } else if (what == "pg_dump") {
- PyFormatter f;
- cluster_state.with_pgmap(
- [&f](const PGMap &pg_map) {
- pg_map.dump(&f);
- }
- );
- return f.get();
- } else if (what == "df") {
- PyFormatter f;
-
- cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
- cluster_state.with_pgmap(
- [&osd_map, &f](const PGMap &pg_map) {
- pg_map.dump_fs_stats(nullptr, &f, true);
- pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
- });
- });
- return f.get();
- } else if (what == "osd_stats") {
- PyFormatter f;
- cluster_state.with_pgmap(
- [&f](const PGMap &pg_map) {
- pg_map.dump_osd_stats(&f);
- });
- return f.get();
- } else if (what == "health" || what == "mon_status") {
- PyFormatter f;
- bufferlist json;
- if (what == "health") {
- json = cluster_state.get_health();
- } else if (what == "mon_status") {
- json = cluster_state.get_mon_status();
- } else {
- ceph_abort();
- }
- f.dump_string("json", json.to_str());
- return f.get();
- } else if (what == "mgr_map") {
- PyFormatter f;
- cluster_state.with_mgrmap([&f](const MgrMap &mgr_map) {
- mgr_map.dump(&f);
- });
- return f.get();
- } else {
- derr << "Python module requested unknown data '" << what << "'" << dendl;
- Py_RETURN_NONE;
- }
-}
-
-void ActivePyModules::start_one(std::string const &module_name,
- PyObject *pClass, PyThreadState *pMyThreadState)
-{
- Mutex::Locker l(lock);
-
- assert(modules.count(module_name) == 0);
-
- modules[module_name].reset(new ActivePyModule(
- module_name, pClass,
- pMyThreadState));
-
- int r = modules[module_name]->load(this);
-
- // FIXME error handling
- assert(r == 0);
-
- std::ostringstream thread_name;
- thread_name << "mgr." << module_name;
- dout(4) << "Starting thread for " << module_name << dendl;
- modules[module_name]->thread.create(thread_name.str().c_str());
-}
-
-void ActivePyModules::shutdown()
-{
- Mutex::Locker locker(lock);
-
- // Signal modules to drop out of serve() and/or tear down resources
- for (auto &i : modules) {
- auto module = i.second.get();
- const auto& name = i.first;
- dout(10) << "waiting for module " << name << " to shutdown" << dendl;
- lock.Unlock();
- module->shutdown();
- lock.Lock();
- dout(10) << "module " << name << " shutdown" << dendl;
- }
-
- // For modules implementing serve(), finish the threads where we
- // were running that.
- for (auto &i : modules) {
- lock.Unlock();
- i.second->thread.join();
- lock.Lock();
- }
-
- modules.clear();
-}
-
-void ActivePyModules::notify_all(const std::string ¬ify_type,
- const std::string ¬ify_id)
-{
- Mutex::Locker l(lock);
-
- dout(10) << __func__ << ": notify_all " << notify_type << dendl;
- for (auto& i : modules) {
- auto module = i.second.get();
- // Send all python calls down a Finisher to avoid blocking
- // C++ code, and avoid any potential lock cycles.
- finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
- module->notify(notify_type, notify_id);
- }));
- }
-}
-
-void ActivePyModules::notify_all(const LogEntry &log_entry)
-{
- Mutex::Locker l(lock);
-
- dout(10) << __func__ << ": notify_all (clog)" << dendl;
- for (auto& i : modules) {
- auto module = i.second.get();
- // Send all python calls down a Finisher to avoid blocking
- // C++ code, and avoid any potential lock cycles.
- //
- // Note intentional use of non-reference lambda binding on
- // log_entry: we take a copy because caller's instance is
- // probably ephemeral.
- finisher.queue(new FunctionContext([module, log_entry](int r){
- module->notify_clog(log_entry);
- }));
- }
-}
-
-bool ActivePyModules::get_config(const std::string &module_name,
- const std::string &key, std::string *val) const
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
-
- const std::string global_key = PyModuleRegistry::config_prefix
- + module_name + "/" + key;
-
- dout(4) << __func__ << "key: " << global_key << dendl;
-
- if (config_cache.count(global_key)) {
- *val = config_cache.at(global_key);
- return true;
- } else {
- return false;
- }
-}
-
-PyObject *ActivePyModules::get_config_prefix(const std::string &module_name,
- const std::string &prefix) const
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
-
- const std::string base_prefix = PyModuleRegistry::config_prefix
- + module_name + "/";
- const std::string global_prefix = base_prefix + prefix;
- dout(4) << __func__ << "prefix: " << global_prefix << dendl;
-
- PyFormatter f;
- for (auto p = config_cache.lower_bound(global_prefix);
- p != config_cache.end() && p->first.find(global_prefix) == 0;
- ++p) {
- f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
- }
- return f.get();
-}
-
-void ActivePyModules::set_config(const std::string &module_name,
- const std::string &key, const boost::optional<std::string>& val)
-{
- const std::string global_key = PyModuleRegistry::config_prefix
- + module_name + "/" + key;
-
- Command set_cmd;
- {
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
- if (val) {
- config_cache[global_key] = *val;
- } else {
- config_cache.erase(global_key);
- }
-
- std::ostringstream cmd_json;
- JSONFormatter jf;
- jf.open_object_section("cmd");
- if (val) {
- jf.dump_string("prefix", "config-key set");
- jf.dump_string("key", global_key);
- jf.dump_string("val", *val);
- } else {
- jf.dump_string("prefix", "config-key del");
- jf.dump_string("key", global_key);
- }
- jf.close_section();
- jf.flush(cmd_json);
- set_cmd.run(&monc, cmd_json.str());
- }
- set_cmd.wait();
-
- if (set_cmd.r != 0) {
- // config-key set will fail if mgr's auth key has insufficient
- // permission to set config keys
- // FIXME: should this somehow raise an exception back into Python land?
- dout(0) << "`config-key set " << global_key << " " << val << "` failed: "
- << cpp_strerror(set_cmd.r) << dendl;
- dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
- }
-}
-
-std::vector<ModuleCommand> ActivePyModules::get_py_commands() const
-{
- Mutex::Locker l(lock);
-
- std::vector<ModuleCommand> result;
- for (const auto& i : modules) {
- auto module = i.second.get();
- auto mod_commands = module->get_commands();
- for (auto j : mod_commands) {
- result.push_back(j);
- }
- }
-
- return result;
-}
-
-std::vector<MonCommand> ActivePyModules::get_commands() const
-{
- std::vector<ModuleCommand> commands = get_py_commands();
- std::vector<MonCommand> result;
- for (auto &pyc: commands) {
- result.push_back({pyc.cmdstring, pyc.helpstring, "mgr",
- pyc.perm, "cli", MonCommand::FLAG_MGR});
- }
- return result;
-}
-
-
-std::map<std::string, std::string> ActivePyModules::get_services() const
-{
- std::map<std::string, std::string> result;
- Mutex::Locker l(lock);
- for (const auto& i : modules) {
- const auto &module = i.second.get();
- std::string svc_str = module->get_uri();
- if (!svc_str.empty()) {
- result[module->get_name()] = svc_str;
- }
- }
-
- return result;
-}
-
-void ActivePyModules::log(const std::string &module_name,
- int level, const std::string &record)
-{
-#undef dout_prefix
-#define dout_prefix *_dout << "mgr[" << module_name << "] "
- dout(level) << record << dendl;
-#undef dout_prefix
-#define dout_prefix *_dout << "mgr " << __func__ << " "
-}
-
-PyObject* ActivePyModules::get_counter_python(
- const std::string &svc_name,
- const std::string &svc_id,
- const std::string &path)
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
-
- PyFormatter f;
- f.open_array_section(path.c_str());
-
- auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
- if (metadata) {
- Mutex::Locker l2(metadata->lock);
- if (metadata->perf_counters.instances.count(path)) {
- auto counter_instance = metadata->perf_counters.instances.at(path);
- const auto &data = counter_instance.get_data();
- for (const auto &datapoint : data) {
- f.open_array_section("datapoint");
- f.dump_unsigned("t", datapoint.t.sec());
- f.dump_unsigned("v", datapoint.v);
- f.close_section();
-
- }
- } else {
- dout(4) << "Missing counter: '" << path << "' ("
- << svc_name << "." << svc_id << ")" << dendl;
- dout(20) << "Paths are:" << dendl;
- for (const auto &i : metadata->perf_counters.instances) {
- dout(20) << i.first << dendl;
- }
- }
- } else {
- dout(4) << "No daemon state for "
- << svc_name << "." << svc_id << ")" << dendl;
- }
- f.close_section();
- return f.get();
-}
-
-PyObject* ActivePyModules::get_perf_schema_python(
- const std::string svc_type,
- const std::string &svc_id)
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
-
- DaemonStateCollection daemons;
-
- if (svc_type == "") {
- daemons = daemon_state.get_all();
- } else if (svc_id.empty()) {
- daemons = daemon_state.get_by_service(svc_type);
- } else {
- auto key = DaemonKey(svc_type, svc_id);
- // so that the below can be a loop in all cases
- auto got = daemon_state.get(key);
- if (got != nullptr) {
- daemons[key] = got;
- }
- }
-
- PyFormatter f;
- if (!daemons.empty()) {
- for (auto statepair : daemons) {
- auto key = statepair.first;
- auto state = statepair.second;
-
- std::ostringstream daemon_name;
- daemon_name << key.first << "." << key.second;
- f.open_object_section(daemon_name.str().c_str());
-
- Mutex::Locker l(state->lock);
- for (auto ctr_inst_iter : state->perf_counters.instances) {
- const auto &counter_name = ctr_inst_iter.first;
- f.open_object_section(counter_name.c_str());
- auto type = state->perf_counters.types[counter_name];
- f.dump_string("description", type.description);
- if (!type.nick.empty()) {
- f.dump_string("nick", type.nick);
- }
- f.dump_unsigned("type", type.type);
- f.dump_unsigned("priority", type.priority);
- f.close_section();
- }
- f.close_section();
- }
- } else {
- dout(4) << __func__ << ": No daemon state found for "
- << svc_type << "." << svc_id << ")" << dendl;
- }
- return f.get();
-}
-
-PyObject *ActivePyModules::get_context()
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
-
- // Construct a capsule containing ceph context.
- // Not incrementing/decrementing ref count on the context because
- // it's the global one and it has process lifetime.
- auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
- return capsule;
-}
-
-static void delete_osdmap(PyObject *object)
-{
- OSDMap *osdmap = static_cast<OSDMap*>(PyCapsule_GetPointer(object, nullptr));
- assert(osdmap);
- dout(10) << __func__ << " " << osdmap << dendl;
- delete osdmap;
-}
-
-PyObject *ActivePyModules::get_osdmap()
-{
- PyThreadState *tstate = PyEval_SaveThread();
- Mutex::Locker l(lock);
- PyEval_RestoreThread(tstate);
-
- // Construct a capsule containing an OSDMap.
- OSDMap *newmap = new OSDMap;
- cluster_state.with_osdmap([&](const OSDMap& o) {
- newmap->deepish_copy_from(o);
- });
- dout(10) << __func__ << " " << newmap << dendl;
- return PyCapsule_New(newmap, nullptr, &delete_osdmap);
-}
-
-void ActivePyModules::set_health_checks(const std::string& module_name,
- health_check_map_t&& checks)
-{
- Mutex::Locker l(lock);
- auto p = modules.find(module_name);
- if (p != modules.end()) {
- p->second->set_health_checks(std::move(checks));
- }
-}
-
-void ActivePyModules::get_health_checks(health_check_map_t *checks)
-{
- Mutex::Locker l(lock);
- for (auto& p : modules) {
- p.second->get_health_checks(checks);
- }
-}
-
-void ActivePyModules::set_uri(const std::string& module_name,
- const std::string &uri)
-{
- Mutex::Locker l(lock);
-
- dout(4) << " module " << module_name << " set URI '" << uri << "'" << dendl;
-
- modules[module_name]->set_uri(uri);
-}
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 John Spray <john.spray@inktank.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-#ifndef PY_MODULES_H_
-#define PY_MODULES_H_
-
-#include "MgrPyModule.h"
-
-#include "common/Finisher.h"
-#include "common/Mutex.h"
-
-#include "osdc/Objecter.h"
-#include "client/Client.h"
-#include "common/LogClient.h"
-#include "mon/MgrMap.h"
-#include "mon/MonCommand.h"
-
-#include "DaemonState.h"
-#include "ClusterState.h"
-
-class ServeThread;
-class health_check_map_t;
-
-typedef std::map<std::string, std::string> PyModuleConfig;
-
-class ActivePyModules
-{
-
- std::map<std::string, std::unique_ptr<ActivePyModule>> modules;
- PyModuleConfig config_cache;
- DaemonStateIndex &daemon_state;
- ClusterState &cluster_state;
- MonClient &monc;
- LogChannelRef clog;
- Objecter &objecter;
- Client &client;
- Finisher &finisher;
-
-
- mutable Mutex lock{"ActivePyModules::lock"};
-
-public:
- ActivePyModules(PyModuleConfig const &config_,
- DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
- LogChannelRef clog_, Objecter &objecter_, Client &client_,
- Finisher &f);
-
- ~ActivePyModules();
-
- // FIXME: wrap for send_command?
- MonClient &get_monc() {return monc;}
- Objecter &get_objecter() {return objecter;}
- Client &get_client() {return client;}
-
- PyObject *get_python(const std::string &what);
- PyObject *get_server_python(const std::string &hostname);
- PyObject *list_servers_python();
- PyObject *get_metadata_python(
- const std::string &svc_type, const std::string &svc_id);
- PyObject *get_daemon_status_python(
- const std::string &svc_type, const std::string &svc_id);
- PyObject *get_counter_python(
- const std::string &svc_type,
- const std::string &svc_id,
- const std::string &path);
- PyObject *get_perf_schema_python(
- const std::string svc_type,
- const std::string &svc_id);
- PyObject *get_context();
- PyObject *get_osdmap();
-
- bool get_config(const std::string &module_name,
- const std::string &key, std::string *val) const;
- PyObject *get_config_prefix(const std::string &module_name,
- const std::string &prefix) const;
- void set_config(const std::string &module_name,
- const std::string &key, const boost::optional<std::string> &val);
-
- void set_health_checks(const std::string& module_name,
- health_check_map_t&& checks);
- void get_health_checks(health_check_map_t *checks);
-
- void set_uri(const std::string& module_name, const std::string &uri);
-
- void log(const std::string &module_name,
- int level, const std::string &record);
-
- // Python command definitions, including callback
- std::vector<ModuleCommand> get_py_commands() const;
-
- // Monitor command definitions, suitable for CLI
- std::vector<MonCommand> get_commands() const;
-
- std::map<std::string, std::string> get_services() const;
-
- // Public so that MonCommandCompletion can use it
- // FIXME: for send_command completion notifications,
- // send it to only the module that sent the command, not everyone
- void notify_all(const std::string ¬ify_type,
- const std::string ¬ify_id);
- void notify_all(const LogEntry &log_entry);
-
- int init();
- void shutdown();
-
- void start_one(std::string const &module_name,
- PyObject *pClass,
- PyThreadState *pMyThreadState);
-
- void dump_server(const std::string &hostname,
- const DaemonStateCollection &dmc,
- Formatter *f);
-};
-
-#endif
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 John Spray <john.spray@redhat.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-/**
- * The interface we present to python code that runs within
- * ceph-mgr. This is implemented as a Python class from which
- * all modules must inherit -- access to the Ceph state is then
- * available as methods on that object.
- */
-
-#include "Mgr.h"
-
-#include "mon/MonClient.h"
-#include "common/errno.h"
-#include "common/version.h"
-
-#include "PyState.h"
-#include "Gil.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_mgr
-
-#define PLACEHOLDER ""
-
-
-typedef struct {
- PyObject_HEAD
- ActivePyModules *py_modules;
- ActivePyModule *this_module;
-} BaseMgrModule;
-
-class MonCommandCompletion : public Context
-{
- ActivePyModules *py_modules;
- PyObject *python_completion;
- const std::string tag;
- SafeThreadState pThreadState;
-
-public:
- std::string outs;
- bufferlist outbl;
-
- MonCommandCompletion(
- ActivePyModules *py_modules_, PyObject* ev,
- const std::string &tag_, PyThreadState *ts_)
- : py_modules(py_modules_), python_completion(ev),
- tag(tag_), pThreadState(ts_)
- {
- assert(python_completion != nullptr);
- Py_INCREF(python_completion);
- }
-
- ~MonCommandCompletion() override
- {
- Py_DECREF(python_completion);
- }
-
- void finish(int r) override
- {
- dout(10) << "MonCommandCompletion::finish()" << dendl;
- {
- // Scoped so the Gil is released before calling notify_all()
- // Create new thread state because this is called via the MonClient
- // Finisher, not the PyModules finisher.
- Gil gil(pThreadState, true);
-
- auto set_fn = PyObject_GetAttrString(python_completion, "complete");
- assert(set_fn != nullptr);
-
- auto pyR = PyInt_FromLong(r);
- auto pyOutBl = PyString_FromString(outbl.to_str().c_str());
- auto pyOutS = PyString_FromString(outs.c_str());
- auto args = PyTuple_Pack(3, pyR, pyOutBl, pyOutS);
- Py_DECREF(pyR);
- Py_DECREF(pyOutBl);
- Py_DECREF(pyOutS);
-
- auto rtn = PyObject_CallObject(set_fn, args);
- if (rtn != nullptr) {
- Py_DECREF(rtn);
- }
- Py_DECREF(args);
- }
- py_modules->notify_all("command", tag);
- }
-};
-
-
-static PyObject*
-ceph_send_command(BaseMgrModule *self, PyObject *args)
-{
- // Like mon, osd, mds
- char *type = nullptr;
-
- // Like "23" for an OSD or "myid" for an MDS
- char *name = nullptr;
-
- char *cmd_json = nullptr;
- char *tag = nullptr;
- PyObject *completion = nullptr;
- if (!PyArg_ParseTuple(args, "Ossss:ceph_send_command",
- &completion, &type, &name, &cmd_json, &tag)) {
- return nullptr;
- }
-
- auto set_fn = PyObject_GetAttrString(completion, "complete");
- if (set_fn == nullptr) {
- ceph_abort(); // TODO raise python exception instead
- } else {
- assert(PyCallable_Check(set_fn));
- }
- Py_DECREF(set_fn);
-
- auto c = new MonCommandCompletion(self->py_modules,
- completion, tag, PyThreadState_Get());
- if (std::string(type) == "mon") {
- self->py_modules->get_monc().start_mon_command(
- {cmd_json},
- {},
- &c->outbl,
- &c->outs,
- c);
- } else if (std::string(type) == "osd") {
- std::string err;
- uint64_t osd_id = strict_strtoll(name, 10, &err);
- if (!err.empty()) {
- delete c;
- string msg("invalid osd_id: ");
- msg.append("\"").append(name).append("\"");
- PyErr_SetString(PyExc_ValueError, msg.c_str());
- return nullptr;
- }
-
- ceph_tid_t tid;
- self->py_modules->get_objecter().osd_command(
- osd_id,
- {cmd_json},
- {},
- &tid,
- &c->outbl,
- &c->outs,
- c);
- } else if (std::string(type) == "mds") {
- int r = self->py_modules->get_client().mds_command(
- name,
- {cmd_json},
- {},
- &c->outbl,
- &c->outs,
- c);
- if (r != 0) {
- string msg("failed to send command to mds: ");
- msg.append(cpp_strerror(r));
- PyErr_SetString(PyExc_RuntimeError, msg.c_str());
- return nullptr;
- }
- } else if (std::string(type) == "pg") {
- pg_t pgid;
- if (!pgid.parse(name)) {
- delete c;
- string msg("invalid pgid: ");
- msg.append("\"").append(name).append("\"");
- PyErr_SetString(PyExc_ValueError, msg.c_str());
- return nullptr;
- }
-
- ceph_tid_t tid;
- self->py_modules->get_objecter().pg_command(
- pgid,
- {cmd_json},
- {},
- &tid,
- &c->outbl,
- &c->outs,
- c);
- return nullptr;
- } else {
- delete c;
- string msg("unknown service type: ");
- msg.append(type);
- PyErr_SetString(PyExc_ValueError, msg.c_str());
- return nullptr;
- }
-
- Py_RETURN_NONE;
-}
-
-static PyObject*
-ceph_set_health_checks(BaseMgrModule *self, PyObject *args)
-{
- PyObject *checks = NULL;
- if (!PyArg_ParseTuple(args, "O:ceph_set_health_checks", &checks)) {
- return NULL;
- }
- if (!PyDict_Check(checks)) {
- derr << __func__ << " arg not a dict" << dendl;
- Py_RETURN_NONE;
- }
- PyObject *checksls = PyDict_Items(checks);
- health_check_map_t out_checks;
- for (int i = 0; i < PyList_Size(checksls); ++i) {
- PyObject *kv = PyList_GET_ITEM(checksls, i);
- char *check_name = nullptr;
- PyObject *check_info = nullptr;
- if (!PyArg_ParseTuple(kv, "sO:pair", &check_name, &check_info)) {
- derr << __func__ << " dict item " << i
- << " not a size 2 tuple" << dendl;
- continue;
- }
- if (!PyDict_Check(check_info)) {
- derr << __func__ << " item " << i << " " << check_name
- << " value not a dict" << dendl;
- continue;
- }
- health_status_t severity = HEALTH_OK;
- string summary;
- list<string> detail;
- PyObject *infols = PyDict_Items(check_info);
- for (int j = 0; j < PyList_Size(infols); ++j) {
- PyObject *pair = PyList_GET_ITEM(infols, j);
- if (!PyTuple_Check(pair)) {
- derr << __func__ << " item " << i << " pair " << j
- << " not a tuple" << dendl;
- continue;
- }
- char *k = nullptr;
- PyObject *v = nullptr;
- if (!PyArg_ParseTuple(pair, "sO:pair", &k, &v)) {
- derr << __func__ << " item " << i << " pair " << j
- << " not a size 2 tuple" << dendl;
- continue;
- }
- string ks(k);
- if (ks == "severity") {
- if (!PyString_Check(v)) {
- derr << __func__ << " check " << check_name
- << " severity value not string" << dendl;
- continue;
- }
- string vs(PyString_AsString(v));
- if (vs == "warning") {
- severity = HEALTH_WARN;
- } else if (vs == "error") {
- severity = HEALTH_ERR;
- }
- } else if (ks == "summary") {
- if (!PyString_Check(v)) {
- derr << __func__ << " check " << check_name
- << " summary value not string" << dendl;
- continue;
- }
- summary = PyString_AsString(v);
- } else if (ks == "detail") {
- if (!PyList_Check(v)) {
- derr << __func__ << " check " << check_name
- << " detail value not list" << dendl;
- continue;
- }
- for (int k = 0; k < PyList_Size(v); ++k) {
- PyObject *di = PyList_GET_ITEM(v, k);
- if (!PyString_Check(di)) {
- derr << __func__ << " check " << check_name
- << " detail item " << k << " not a string" << dendl;
- continue;
- }
- detail.push_back(PyString_AsString(di));
- }
- } else {
- derr << __func__ << " check " << check_name
- << " unexpected key " << k << dendl;
- }
- }
- auto& d = out_checks.add(check_name, severity, summary);
- d.detail.swap(detail);
- }
-
- JSONFormatter jf(true);
- dout(10) << "module " << self->this_module->get_name()
- << " health checks:\n";
- out_checks.dump(&jf);
- jf.flush(*_dout);
- *_dout << dendl;
-
- self->py_modules->set_health_checks(self->this_module->get_name(),
- std::move(out_checks));
-
- Py_RETURN_NONE;
-}
-
-
-static PyObject*
-ceph_state_get(BaseMgrModule *self, PyObject *args)
-{
- char *what = NULL;
- if (!PyArg_ParseTuple(args, "s:ceph_state_get", &what)) {
- return NULL;
- }
-
- return self->py_modules->get_python(what);
-}
-
-
-static PyObject*
-ceph_get_server(BaseMgrModule *self, PyObject *args)
-{
- char *hostname = NULL;
- if (!PyArg_ParseTuple(args, "z:ceph_get_server", &hostname)) {
- return NULL;
- }
-
- if (hostname) {
- return self->py_modules->get_server_python(hostname);
- } else {
- return self->py_modules->list_servers_python();
- }
-}
-
-static PyObject*
-ceph_get_mgr_id(BaseMgrModule *self, PyObject *args)
-{
- return PyString_FromString(g_conf->name.get_id().c_str());
-}
-
-static PyObject*
-ceph_config_get(BaseMgrModule *self, PyObject *args)
-{
- char *what = nullptr;
- if (!PyArg_ParseTuple(args, "s:ceph_config_get", &what)) {
- derr << "Invalid args!" << dendl;
- return nullptr;
- }
-
- std::string value;
- bool found = self->py_modules->get_config(self->this_module->get_name(),
- what, &value);
- if (found) {
- dout(10) << "ceph_config_get " << what << " found: " << value.c_str() << dendl;
- return PyString_FromString(value.c_str());
- } else {
- dout(4) << "ceph_config_get " << what << " not found " << dendl;
- Py_RETURN_NONE;
- }
-}
-
-static PyObject*
-ceph_config_get_prefix(BaseMgrModule *self, PyObject *args)
-{
- char *prefix = nullptr;
- if (!PyArg_ParseTuple(args, "s:ceph_config_get", &prefix)) {
- derr << "Invalid args!" << dendl;
- return nullptr;
- }
-
- return self->py_modules->get_config_prefix(self->this_module->get_name(),
- prefix);
-}
-
-static PyObject*
-ceph_config_set(BaseMgrModule *self, PyObject *args)
-{
- char *key = nullptr;
- char *value = nullptr;
- if (!PyArg_ParseTuple(args, "sz:ceph_config_set", &key, &value)) {
- return nullptr;
- }
- boost::optional<string> val;
- if (value) {
- val = value;
- }
- self->py_modules->set_config(self->this_module->get_name(), key, val);
-
- Py_RETURN_NONE;
-}
-
-static PyObject*
-get_metadata(BaseMgrModule *self, PyObject *args)
-{
- char *svc_name = NULL;
- char *svc_id = NULL;
- if (!PyArg_ParseTuple(args, "ss:get_metadata", &svc_name, &svc_id)) {
- return nullptr;
- }
- return self->py_modules->get_metadata_python(svc_name, svc_id);
-}
-
-static PyObject*
-get_daemon_status(BaseMgrModule *self, PyObject *args)
-{
- char *svc_name = NULL;
- char *svc_id = NULL;
- if (!PyArg_ParseTuple(args, "ss:get_daemon_status", &svc_name,
- &svc_id)) {
- return nullptr;
- }
- return self->py_modules->get_daemon_status_python(svc_name, svc_id);
-}
-
-static PyObject*
-ceph_log(BaseMgrModule *self, PyObject *args)
-{
- int level = 0;
- char *record = nullptr;
- if (!PyArg_ParseTuple(args, "is:log", &level, &record)) {
- return nullptr;
- }
-
- assert(self->this_module);
-
- self->py_modules->log(self->this_module->get_name(), level, record);
-
- Py_RETURN_NONE;
-}
-
-static PyObject *
-ceph_get_version(BaseMgrModule *self, PyObject *args)
-{
- return PyString_FromString(pretty_version_to_str().c_str());
-}
-
-static PyObject *
-ceph_get_context(BaseMgrModule *self, PyObject *args)
-{
- return self->py_modules->get_context();
-}
-
-static PyObject*
-get_counter(BaseMgrModule *self, PyObject *args)
-{
- char *svc_name = nullptr;
- char *svc_id = nullptr;
- char *counter_path = nullptr;
- if (!PyArg_ParseTuple(args, "sss:get_counter", &svc_name,
- &svc_id, &counter_path)) {
- return nullptr;
- }
- return self->py_modules->get_counter_python(
- svc_name, svc_id, counter_path);
-}
-
-static PyObject*
-get_perf_schema(BaseMgrModule *self, PyObject *args)
-{
- char *type_str = nullptr;
- char *svc_id = nullptr;
- if (!PyArg_ParseTuple(args, "ss:get_perf_schema", &type_str,
- &svc_id)) {
- return nullptr;
- }
-
- return self->py_modules->get_perf_schema_python(type_str, svc_id);
-}
-
-static PyObject *
-ceph_get_osdmap(BaseMgrModule *self, PyObject *args)
-{
- return self->py_modules->get_osdmap();
-}
-
-static PyObject*
-ceph_set_uri(BaseMgrModule *self, PyObject *args)
-{
- char *svc_str = nullptr;
- if (!PyArg_ParseTuple(args, "s:ceph_advertize_service",
- &svc_str)) {
- return nullptr;
- }
-
- // We call down into PyModules even though we have a MgrPyModule
- // reference here, because MgrPyModule's fields are protected
- // by PyModules' lock.
- self->py_modules->set_uri(self->this_module->get_name(), svc_str);
-
- Py_RETURN_NONE;
-}
-
-
-PyMethodDef BaseMgrModule_methods[] = {
- {"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS,
- "Get a cluster object"},
-
- {"_ceph_get_server", (PyCFunction)ceph_get_server, METH_VARARGS,
- "Get a server object"},
-
- {"_ceph_get_metadata", (PyCFunction)get_metadata, METH_VARARGS,
- "Get a service's metadata"},
-
- {"_ceph_get_daemon_status", (PyCFunction)get_daemon_status, METH_VARARGS,
- "Get a service's status"},
-
- {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS,
- "Send a mon command"},
-
- {"_ceph_set_health_checks", (PyCFunction)ceph_set_health_checks, METH_VARARGS,
- "Set health checks for this module"},
-
- {"_ceph_get_mgr_id", (PyCFunction)ceph_get_mgr_id, METH_NOARGS,
- "Get the name of the Mgr daemon where we are running"},
-
- {"_ceph_get_config", (PyCFunction)ceph_config_get, METH_VARARGS,
- "Get a configuration value"},
-
- {"_ceph_get_config_prefix", (PyCFunction)ceph_config_get_prefix, METH_VARARGS,
- "Get all configuration values with a given prefix"},
-
- {"_ceph_set_config", (PyCFunction)ceph_config_set, METH_VARARGS,
- "Set a configuration value"},
-
- {"_ceph_get_counter", (PyCFunction)get_counter, METH_VARARGS,
- "Get a performance counter"},
-
- {"_ceph_get_perf_schema", (PyCFunction)get_perf_schema, METH_VARARGS,
- "Get the performance counter schema"},
-
- {"_ceph_log", (PyCFunction)ceph_log, METH_VARARGS,
- "Emit a (local) log message"},
-
- {"_ceph_get_version", (PyCFunction)ceph_get_version, METH_VARARGS,
- "Get the ceph version of this process"},
-
- {"_ceph_get_context", (PyCFunction)ceph_get_context, METH_NOARGS,
- "Get a CephContext* in a python capsule"},
-
- {"_ceph_get_osdmap", (PyCFunction)ceph_get_osdmap, METH_NOARGS,
- "Get an OSDMap* in a python capsule"},
-
- {"_ceph_set_uri", (PyCFunction)ceph_set_uri, METH_VARARGS,
- "Advertize a service URI served by this module"},
-
- {NULL, NULL, 0, NULL}
-};
-
-
-static PyObject *
-BaseMgrModule_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
-{
- BaseMgrModule *self;
-
- self = (BaseMgrModule *)type->tp_alloc(type, 0);
-
- return (PyObject *)self;
-}
-
-static int
-BaseMgrModule_init(BaseMgrModule *self, PyObject *args, PyObject *kwds)
-{
- PyObject *py_modules_capsule = nullptr;
- PyObject *this_module_capsule = nullptr;
- static const char *kwlist[] = {"py_modules", "this_module", NULL};
-
- if (! PyArg_ParseTupleAndKeywords(args, kwds, "OO",
- const_cast<char**>(kwlist),
- &py_modules_capsule,
- &this_module_capsule)) {
- return -1;
- }
-
- self->py_modules = (ActivePyModules*)PyCapsule_GetPointer(
- py_modules_capsule, nullptr);
- assert(self->py_modules);
- self->this_module = (ActivePyModule*)PyCapsule_GetPointer(
- this_module_capsule, nullptr);
- assert(self->this_module);
-
- return 0;
-}
-
-PyTypeObject BaseMgrModuleType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "ceph_module.BaseMgrModule", /* tp_name */
- sizeof(BaseMgrModule), /* tp_basicsize */
- 0, /* tp_itemsize */
- 0, /* tp_dealloc */
- 0, /* tp_print */
- 0, /* tp_getattr */
- 0, /* tp_setattr */
- 0, /* tp_compare */
- 0, /* tp_repr */
- 0, /* tp_as_number */
- 0, /* tp_as_sequence */
- 0, /* tp_as_mapping */
- 0, /* tp_hash */
- 0, /* tp_call */
- 0, /* tp_str */
- 0, /* tp_getattro */
- 0, /* tp_setattro */
- 0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
- "ceph-mgr Python Plugin", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- BaseMgrModule_methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc)BaseMgrModule_init, /* tp_init */
- 0, /* tp_alloc */
- BaseMgrModule_new, /* tp_new */
-};
-
+++ /dev/null
-#ifndef PYSTATE_H_
-#define PYSTATE_H_
-
-#include "Python.h"
-
-extern PyTypeObject BaseMgrModuleType;
-
-#endif
-