--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat Inc
+ *
+ * Author: John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "mgr/Mgr.h"
+
+#include "include/types.h"
+#include "common/config.h"
+#include "common/ceph_argparse.h"
+#include "common/errno.h"
+#include "global/global_init.h"
+
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_MGR, CODE_ENVIRONMENT_DAEMON, 0,
+ "mgr_data");
+ common_init_finish(g_ceph_context);
+ // For consumption by KeyRing::from_ceph_context in MonClient
+ g_conf->set_val("keyring", "$mgr_data/keyring", false);
+
+ Mgr mgr;
+
+ // Handle --help before calling init() so we don't depend on network.
+ if ((args.size() == 1 && (std::string(args[0]) == "--help" || std::string(args[0]) == "-h"))) {
+ mgr.usage();
+ return 0;
+ }
+
+ global_init_daemonize(g_ceph_context);
+ global_init_chdir(g_ceph_context);
+ common_init_finish(g_ceph_context);
+
+ // Connect to mon cluster, download MDS map etc
+ int rc = mgr.init();
+ if (rc != 0) {
+ std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl;
+ return rc;
+ }
+
+ // Finally, execute the user's commands
+ rc = mgr.main(args);
+ if (rc != 0) {
+ std::cerr << "Error (" << cpp_strerror(rc) << ")" << std::endl;
+ }
+
+ mgr.shutdown();
+
+ return rc;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "messages/MMgrDigest.h"
+
+#include "mgr/ClusterState.h"
+
+
+ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_)
+ : monc(monc_), objecter(objecter_), lock("ClusterState")
+{}
+
+void ClusterState::set_objecter(Objecter *objecter_)
+{
+ Mutex::Locker l(lock);
+
+ objecter = objecter_;
+}
+
+void ClusterState::set_fsmap(FSMap const &new_fsmap)
+{
+ Mutex::Locker l(lock);
+
+ fsmap = new_fsmap;
+}
+
+void ClusterState::load_digest(MMgrDigest *m)
+{
+ pg_summary_json = std::move(m->pg_summary_json);
+ health_json = std::move(m->health_json);
+ mon_status_json = std::move(m->mon_status_json);
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef CLUSTER_STATE_H_
+#define CLUSTER_STATE_H_
+
+#include "mds/FSMap.h"
+#include "common/Mutex.h"
+
+#include "osdc/Objecter.h"
+#include "mon/MonClient.h"
+
+class MMgrDigest;
+
+
+/**
+ * Cluster-scope state (things like cluster maps) as opposed
+ * to daemon-level state (things like perf counters and smart)
+ */
+class ClusterState
+{
+protected:
+ MonClient *monc;
+ Objecter *objecter;
+ FSMap fsmap;
+ Mutex lock;
+
+ bufferlist pg_summary_json;
+ bufferlist health_json;
+ bufferlist mon_status_json;
+
+public:
+
+ void load_digest(MMgrDigest *m);
+
+ const bufferlist &get_pg_summary() const {return pg_summary_json;}
+ const bufferlist &get_health() const {return health_json;}
+ const bufferlist &get_mon_status() const {return mon_status_json;}
+
+ ClusterState(MonClient *monc_, Objecter *objecter_);
+
+ void set_objecter(Objecter *objecter_);
+ void set_fsmap(FSMap const &new_fsmap);
+
+ template<typename Callback, typename...Args>
+ void with_fsmap(Callback&& cb, Args&&...args)
+ {
+ Mutex::Locker l(lock);
+ std::forward<Callback>(cb)(const_cast<const FSMap&>(fsmap),
+ std::forward<Args>(args)...);
+ }
+
+ template<typename Callback, typename...Args>
+ void with_monmap(Callback&& cb, Args&&...args)
+ {
+ Mutex::Locker l(lock);
+ assert(monc != nullptr);
+ monc->with_monmap(cb);
+ }
+
+ template<typename Callback, typename...Args>
+ void with_osdmap(Callback&& cb, Args&&...args)
+ {
+ Mutex::Locker l(lock);
+ assert(objecter != nullptr);
+ objecter->with_osdmap(cb);
+ }
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "DaemonMetadata.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+void DaemonMetadataIndex::insert(DaemonMetadataPtr dm)
+{
+ if (exists(dm->key)) {
+ _erase(dm->key);
+ }
+
+ by_server[dm->hostname][dm->key] = dm;
+ all[dm->key] = dm;
+}
+
+void DaemonMetadataIndex::_erase(DaemonKey dmk)
+{
+ assert(lock.is_locked_by_me());
+
+ const auto dm = all.at(dmk);
+ auto &server_collection = by_server[dm->hostname];
+ server_collection.erase(dm->key);
+ if (server_collection.empty()) {
+ by_server.erase(dm->hostname);
+ }
+
+ all.erase(dmk);
+}
+
+DaemonMetadataCollection DaemonMetadataIndex::get_by_type(uint8_t type) const
+{
+ DaemonMetadataCollection result;
+
+ for (const auto &i : all) {
+ if (i.first.first == type) {
+ result[i.first] = i.second;
+ }
+ }
+
+ return result;
+}
+
+DaemonMetadataCollection DaemonMetadataIndex::get_by_server(const std::string &hostname) const
+{
+ if (by_server.count(hostname)) {
+ return by_server.at(hostname);
+ } else {
+ return {};
+ }
+}
+
+bool DaemonMetadataIndex::exists(const DaemonKey &key) const
+{
+ return all.count(key) > 0;
+}
+
+DaemonMetadataPtr DaemonMetadataIndex::get(const DaemonKey &key)
+{
+ return all.at(key);
+}
+
+void DaemonMetadataIndex::cull(entity_type_t daemon_type,
+ std::set<std::string> names_exist)
+{
+ Mutex::Locker l(lock);
+
+ std::set<DaemonKey> victims;
+
+ for (const auto &i : all) {
+ if (i.first.first != daemon_type) {
+ continue;
+ }
+
+ if (names_exist.count(i.first.second) == 0) {
+ victims.insert(i.first);
+ }
+ }
+
+ for (const auto &i : victims) {
+ dout(4) << "Removing data for " << i << dendl;
+ _erase(i);
+ }
+}
+
+void DaemonPerfCounters::update(MMgrReport *report)
+{
+ dout(20) << "loading " << report->declare_types.size() << " new types, "
+ << report->packed.length() << " bytes of data" << dendl;
+
+ // Load any newly declared types
+ for (const auto &t : report->declare_types) {
+ types.insert(std::make_pair(t.path, t));
+ declared_types.insert(t.path);
+ }
+
+ // Parse packed data according to declared set of types
+ bufferlist::iterator p = report->packed.begin();
+ DECODE_START(1, p);
+ for (const auto &t_path : declared_types) {
+ const auto &t = types.at(t_path);
+ uint64_t val = 0;
+ uint64_t avgcount = 0;
+ uint64_t avgcount2 = 0;
+
+ ::decode(val, p);
+ if (t.type & PERFCOUNTER_LONGRUNAVG) {
+ ::decode(avgcount, p);
+ ::decode(avgcount2, p);
+ }
+ // TODO: interface for insertion of avgs, add timestamp
+ instances[t_path].push(val);
+ }
+ // TODO: handle badly encoded things without asserting out
+ DECODE_FINISH(p);
+}
+
+
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef DAEMON_METADATA_H_
+#define DAEMON_METADATA_H_
+
+// TODO: rename me to DaemonState from DaemonMetadata
+
+#include <map>
+#include <string>
+#include <memory>
+#include <set>
+
+#include "common/Mutex.h"
+
+#include "msg/msg_types.h"
+
+// For PerfCounterType
+#include "messages/MMgrReport.h"
+
+
+// Unique reference to a daemon within a cluster
+typedef std::pair<entity_type_t, std::string> DaemonKey;
+
+// An instance of a performance counter type, within
+// a particular daemon.
+class PerfCounterInstance
+{
+ // TODO: store some short history or whatever
+ uint64_t current;
+ public:
+ void push(uint64_t const &v) {current = v;}
+};
+
+
+typedef std::map<std::string, PerfCounterType> PerfCounterTypes;
+
+// Performance counters for one daemon
+class DaemonPerfCounters
+{
+ public:
+ // The record of perf stat types, shared between daemons
+ PerfCounterTypes &types;
+
+ DaemonPerfCounters(PerfCounterTypes &types_)
+ : types(types_)
+ {}
+
+ std::map<std::string, PerfCounterInstance> instances;
+
+ // FIXME: this state is really local to DaemonServer, it's part
+ // of the protocol rather than being part of what other classes
+ // mgiht want to read. Maybe have a separate session object
+ // inside DaemonServer instead of stashing session-ish state here?
+ std::set<std::string> declared_types;
+
+ void update(MMgrReport *report);
+};
+
+// The state that we store about one daemon
+class DaemonMetadata
+{
+ public:
+ DaemonKey key;
+
+ // The hostname where daemon was last seen running (extracted
+ // from the metadata)
+ std::string hostname;
+
+ // The metadata (hostname, version, etc) sent from the daemon
+ std::map<std::string, std::string> metadata;
+
+ // The perf counters received in MMgrReport messages
+ DaemonPerfCounters perf_counters;
+
+ DaemonMetadata(PerfCounterTypes &types_)
+ : perf_counters(types_)
+ {
+ }
+};
+
+typedef std::shared_ptr<DaemonMetadata> DaemonMetadataPtr;
+typedef std::map<DaemonKey, DaemonMetadataPtr> DaemonMetadataCollection;
+
+
+
+
+/**
+ * Fuse the collection of per-daemon metadata from Ceph into
+ * a view that can be queried by service type, ID or also
+ * by server (aka fqdn).
+ */
+class DaemonMetadataIndex
+{
+ private:
+ std::map<std::string, DaemonMetadataCollection> by_server;
+ DaemonMetadataCollection all;
+
+ std::set<DaemonKey> updating;
+
+ Mutex lock;
+
+ public:
+
+ DaemonMetadataIndex() : lock("DaemonState") {}
+
+ // FIXME: shouldn't really be public, maybe construct DaemonMetadata
+ // objects internally to avoid this.
+ PerfCounterTypes types;
+
+ void insert(DaemonMetadataPtr dm);
+ void _erase(DaemonKey dmk);
+
+ bool exists(const DaemonKey &key) const;
+ DaemonMetadataPtr get(const DaemonKey &key);
+ DaemonMetadataCollection get_by_server(const std::string &hostname) const;
+ DaemonMetadataCollection get_by_type(uint8_t type) const;
+
+ const DaemonMetadataCollection &get_all() const {return all;}
+ const std::map<std::string, DaemonMetadataCollection> &get_all_servers() const
+ {
+ return by_server;
+ }
+
+ void notify_updating(const DaemonKey &k) { updating.insert(k); }
+ void clear_updating(const DaemonKey &k) { updating.erase(k); }
+ bool is_updating(const DaemonKey &k) { return updating.count(k) > 0; }
+
+ /**
+ * Remove state for all daemons of this type whose names are
+ * not present in `names_exist`. Use this function when you have
+ * a cluster map and want to ensure that anything absent in the map
+ * is also absent in this class.
+ */
+ void cull(entity_type_t daemon_type, std::set<std::string> names_exist);
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "DaemonServer.h"
+
+#include "messages/MMgrOpen.h"
+#include "messages/MMgrConfigure.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr.server " << __func__ << " "
+
+DaemonServer::DaemonServer(MonClient *monc_,
+ DaemonMetadataIndex &daemon_state_,
+ PyModules &py_modules_)
+ : Dispatcher(g_ceph_context), msgr(nullptr), monc(monc_),
+ daemon_state(daemon_state_),
+ py_modules(py_modules_),
+ auth_registry(g_ceph_context,
+ g_conf->auth_supported.empty() ?
+ g_conf->auth_cluster_required :
+ g_conf->auth_supported),
+ lock("DaemonServer")
+{}
+
+DaemonServer::~DaemonServer() {
+ delete msgr;
+}
+
+int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
+{
+ // Initialize Messenger
+ msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
+ entity_name_t::MGR(gid), "server", getpid());
+ int r = msgr->bind(g_conf->public_addr);
+ if (r < 0)
+ return r;
+
+ msgr->set_myname(entity_name_t::MGR(gid));
+ msgr->set_addr_unknowns(client_addr);
+
+ msgr->start();
+ msgr->add_dispatcher_tail(this);
+
+ return 0;
+}
+
+entity_addr_t DaemonServer::get_myaddr() const
+{
+ return msgr->get_myaddr();
+}
+
+
+bool DaemonServer::ms_verify_authorizer(Connection *con,
+ int peer_type,
+ int protocol,
+ ceph::bufferlist& authorizer_data,
+ ceph::bufferlist& authorizer_reply,
+ bool& is_valid,
+ CryptoKey& session_key)
+{
+ auto handler = auth_registry.get_handler(protocol);
+ if (!handler) {
+ dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
+ assert(0);
+ is_valid = false;
+ return true;
+ }
+
+ AuthCapsInfo caps_info;
+ EntityName name;
+ uint64_t global_id = 0;
+
+ is_valid = handler->verify_authorizer(cct, monc->rotating_secrets,
+ authorizer_data,
+ authorizer_reply, name,
+ global_id, caps_info,
+ session_key);
+
+ // TODO: invent some caps suitable for ceph-mgr
+
+ return true;
+}
+
+
+bool DaemonServer::ms_get_authorizer(int dest_type,
+ AuthAuthorizer **authorizer, bool force_new)
+{
+ dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
+
+ if (dest_type == CEPH_ENTITY_TYPE_MON) {
+ return true;
+ }
+
+ if (force_new) {
+ if (monc->wait_auth_rotating(10) < 0)
+ return false;
+ }
+
+ *authorizer = monc->auth->build_authorizer(dest_type);
+ dout(20) << "got authorizer " << *authorizer << dendl;
+ return *authorizer != NULL;
+}
+
+
+bool DaemonServer::ms_dispatch(Message *m)
+{
+ Mutex::Locker l(lock);
+
+ switch(m->get_type()) {
+ case MSG_MGR_REPORT:
+ return handle_report(static_cast<MMgrReport*>(m));
+ case MSG_MGR_OPEN:
+ return handle_open(static_cast<MMgrOpen*>(m));
+ case MSG_COMMAND:
+ return handle_command(static_cast<MCommand*>(m));
+ default:
+ dout(1) << "Unhandled message type " << m->get_type() << dendl;
+ return false;
+ };
+}
+
+void DaemonServer::shutdown()
+{
+ msgr->shutdown();
+ msgr->wait();
+}
+
+
+
+bool DaemonServer::handle_open(MMgrOpen *m)
+{
+ DaemonKey key(
+ m->get_connection()->get_peer_type(),
+ m->daemon_name);
+
+ dout(4) << "from " << m->get_connection() << " name "
+ << m->daemon_name << dendl;
+
+ auto configure = new MMgrConfigure();
+ configure->stats_period = 5;
+ m->get_connection()->send_message(configure);
+
+ m->put();
+ return true;
+}
+
+bool DaemonServer::handle_report(MMgrReport *m)
+{
+ DaemonKey key(
+ m->get_connection()->get_peer_type(),
+ m->daemon_name);
+
+ dout(4) << "from " << m->get_connection() << " name "
+ << m->daemon_name << dendl;
+
+ DaemonMetadataPtr daemon;
+ if (daemon_state.exists(key)) {
+ daemon = daemon_state.get(key);
+ } else {
+ daemon = std::make_shared<DaemonMetadata>(daemon_state.types);
+ // FIXME: crap, we don't know the hostname at this stage.
+ daemon->key = key;
+ daemon_state.insert(daemon);
+ // FIXME: we should request metadata at this stage
+ }
+
+ assert(daemon != nullptr);
+ auto daemon_counters = daemon->perf_counters;
+ daemon_counters.update(m);
+
+ m->put();
+ return true;
+}
+
+struct MgrCommand {
+ string cmdstring;
+ string helpstring;
+ string module;
+ string perm;
+ string availability;
+} mgr_commands[] = {
+
+#define COMMAND(parsesig, helptext, module, perm, availability) \
+ {parsesig, helptext, module, perm, availability},
+
+COMMAND("foo " \
+ "name=bar,type=CephString", \
+ "do a thing", "mgr", "rw", "cli")
+};
+
+bool DaemonServer::handle_command(MCommand *m)
+{
+ int r = 0;
+ std::stringstream ss;
+ std::stringstream ds;
+ bufferlist odata;
+ std::string prefix;
+
+ assert(lock.is_locked_by_me());
+
+ cmdmap_t cmdmap;
+
+ // TODO enforce some caps
+
+ // TODO background the call into python land so that we don't
+ // block a messenger thread on python code.
+
+ ConnectionRef con = m->get_connection();
+
+ if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
+ r = -EINVAL;
+ goto out;
+ }
+
+ dout(4) << "decoded " << cmdmap.size() << dendl;
+ cmd_getval(cct, cmdmap, "prefix", prefix);
+
+ dout(4) << "prefix=" << prefix << dendl;
+
+ if (prefix == "get_command_descriptions") {
+ int cmdnum = 0;
+
+ dout(10) << "reading commands from python modules" << dendl;
+ auto py_commands = py_modules.get_commands();
+
+ JSONFormatter f;
+ f.open_object_section("command_descriptions");
+ for (const auto &pyc : py_commands) {
+ ostringstream secname;
+ secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
+ dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring
+ << ")" << dendl;
+ dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring,
+ "mgr", pyc.perm, "cli");
+ cmdnum++;
+ }
+#if 0
+ for (MgrCommand *cp = mgr_commands;
+ cp < &mgr_commands[ARRAY_SIZE(mgr_commands)]; cp++) {
+
+ ostringstream secname;
+ secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
+ dump_cmddesc_to_json(f, secname.str(), cp->cmdstring, cp->helpstring,
+ cp->module, cp->perm, cp->availability);
+ cmdnum++;
+ }
+#endif
+ f.close_section(); // command_descriptions
+
+ f.flush(ds);
+ goto out;
+ } else {
+ // Let's find you a handler!
+ MgrPyModule *handler = nullptr;
+ auto py_commands = py_modules.get_commands();
+ for (const auto &pyc : py_commands) {
+ auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
+ dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
+ if (pyc_prefix == prefix) {
+ handler = pyc.handler;
+ break;
+ }
+ }
+
+ if (handler == nullptr) {
+ ss << "No handler found for '" << prefix << "'";
+ dout(4) << "No handler found for '" << prefix << "'" << dendl;
+ r = -EINVAL;
+ goto out;
+ }
+
+ // FIXME: go run this python part in another thread, not inline
+ // with a ms_dispatch, so that the python part can block if it
+ // wants to.
+ dout(4) << "passing through " << cmdmap.size() << dendl;
+ r = handler->handle_command(cmdmap, &ds, &ss);
+ goto out;
+ }
+
+ out:
+ std::string rs;
+ rs = ss.str();
+ odata.append(ds);
+ dout(1) << "do_command r=" << r << " " << rs << dendl;
+ //clog->info() << rs << "\n";
+ if (con) {
+ MCommandReply *reply = new MCommandReply(r, rs);
+ reply->set_tid(m->get_tid());
+ reply->set_data(odata);
+ con->send_message(reply);
+ }
+
+ m->put();
+ return true;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef DAEMON_SERVER_H_
+#define DAEMON_SERVER_H_
+
+#include "PyModules.h"
+
+#include <set>
+#include <string>
+
+#include "common/Mutex.h"
+
+#include <msg/Messenger.h>
+#include <mon/MonClient.h>
+
+#include "auth/AuthAuthorizeHandler.h"
+
+#include "DaemonMetadata.h"
+
+class MMgrReport;
+class MMgrOpen;
+class MCommand;
+
+
+/**
+ * Server used in ceph-mgr to communicate with Ceph daemons like
+ * MDSs and OSDs.
+ */
+class DaemonServer : public Dispatcher
+{
+protected:
+ Messenger *msgr;
+ MonClient *monc;
+ DaemonMetadataIndex &daemon_state;
+ PyModules &py_modules;
+
+ AuthAuthorizeHandlerRegistry auth_registry;
+
+ Mutex lock;
+
+
+public:
+ int init(uint64_t gid, entity_addr_t client_addr);
+ void shutdown();
+
+ entity_addr_t get_myaddr() const;
+
+ DaemonServer(MonClient *monc_,
+ DaemonMetadataIndex &daemon_state_,
+ PyModules &py_modules_);
+ ~DaemonServer();
+
+ bool ms_dispatch(Message *m);
+ bool ms_handle_reset(Connection *con) { return false; }
+ void ms_handle_remote_reset(Connection *con) {}
+ bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+ bool force_new);
+ bool ms_verify_authorizer(Connection *con,
+ int peer_type,
+ int protocol,
+ ceph::bufferlist& authorizer,
+ ceph::bufferlist& authorizer_reply,
+ bool& isvalid,
+ CryptoKey& session_key);
+
+ bool handle_open(MMgrOpen *m);
+ bool handle_report(MMgrReport *m);
+ bool handle_command(MCommand *m);
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "common/errno.h"
+#include "mon/MonClient.h"
+#include "include/stringify.h"
+#include "global/global_context.h"
+
+#include "mgr/MgrContext.h"
+
+#include "MgrPyModule.h"
+#include "DaemonServer.h"
+#include "messages/MMgrBeacon.h"
+#include "messages/MMgrDigest.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+
+#include "Mgr.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+
+Mgr::Mgr() :
+ Dispatcher(g_ceph_context),
+ objecter(NULL),
+ monc(new MonClient(g_ceph_context)),
+ lock("Mgr::lock"),
+ timer(g_ceph_context, lock),
+ finisher(g_ceph_context, "Mgr", "mgr-fin"),
+ waiting_for_fs_map(NULL),
+ py_modules(daemon_state, cluster_state, *monc, finisher),
+ cluster_state(monc, nullptr),
+ server(monc, daemon_state, py_modules)
+{
+ client_messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
+
+ // FIXME: using objecter as convenience to handle incremental
+ // OSD maps, but that's overkill. We don't really need an objecter.
+ // Could we separate out the part of Objecter that we really need?
+ objecter = new Objecter(g_ceph_context, client_messenger, monc, NULL, 0, 0);
+
+ cluster_state.set_objecter(objecter);
+}
+
+
+Mgr::~Mgr()
+{
+ delete objecter;
+ delete monc;
+ delete client_messenger;
+ assert(waiting_for_fs_map == NULL);
+}
+
+
+/**
+ * Context for completion of metadata mon commands: take
+ * the result and stash it in DaemonMetadataIndex
+ */
+class MetadataUpdate : public Context
+{
+ DaemonMetadataIndex &daemon_state;
+ DaemonKey key;
+
+public:
+ bufferlist outbl;
+ std::string outs;
+
+ MetadataUpdate(DaemonMetadataIndex &daemon_state_, const DaemonKey &key_)
+ : daemon_state(daemon_state_), key(key_) {}
+
+ void finish(int r)
+ {
+ daemon_state.clear_updating(key);
+ if (r == 0) {
+ if (key.first == CEPH_ENTITY_TYPE_MDS) {
+ json_spirit::mValue json_result;
+ bool read_ok = json_spirit::read(
+ outbl.to_str(), json_result);
+ if (!read_ok) {
+ dout(1) << "mon returned invalid JSON for "
+ << ceph_entity_type_name(key.first)
+ << "." << key.second << dendl;
+ return;
+ }
+
+
+ json_spirit::mObject daemon_meta = json_result.get_obj();
+ DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+ dm->key = key;
+ dm->hostname = daemon_meta.at("hostname").get_str();
+
+ daemon_meta.erase("name");
+ daemon_meta.erase("hostname");
+
+ for (const auto &i : daemon_meta) {
+ dm->metadata[i.first] = i.second.get_str();
+ }
+
+ daemon_state.insert(dm);
+ } else if (key.first == CEPH_ENTITY_TYPE_OSD) {
+ } else {
+ assert(0);
+ }
+ } else {
+ dout(1) << "mon failed to return metadata for "
+ << ceph_entity_type_name(key.first)
+ << "." << key.second << ": " << cpp_strerror(r) << dendl;
+ }
+ }
+};
+
+
+
+int Mgr::init()
+{
+ Mutex::Locker l(lock);
+
+ // Initialize Messenger
+ int r = client_messenger->bind(g_conf->public_addr);
+ if (r < 0)
+ return r;
+
+ client_messenger->start();
+
+ objecter->set_client_incarnation(0);
+ objecter->init();
+
+ // Connect dispatchers before starting objecter
+ client_messenger->add_dispatcher_tail(objecter);
+ client_messenger->add_dispatcher_tail(this);
+
+ // Initialize MonClient
+ if (monc->build_initial_monmap() < 0) {
+ objecter->shutdown();
+ client_messenger->shutdown();
+ client_messenger->wait();
+ return -1;
+ }
+
+ monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
+ |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
+ monc->set_messenger(client_messenger);
+ monc->init();
+ r = monc->authenticate();
+ if (r < 0) {
+ derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
+ monc->shutdown();
+ objecter->shutdown();
+ client_messenger->shutdown();
+ client_messenger->wait();
+ return r;
+ }
+
+ client_t whoami = monc->get_global_id();
+ client_messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+
+ // Start communicating with daemons to learn statistics etc
+ server.init(monc->get_global_id(), client_messenger->get_myaddr());
+
+ dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
+ // TODO: send the beacon periodically
+ MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(),
+ server.get_myaddr());
+ monc->send_mon_message(m);
+
+ // Preload all daemon metadata (will subsequently keep this
+ // up to date by watching maps, so do the initial load before
+ // we subscribe to any maps)
+ dout(4) << "Loading daemon metadata..." << dendl;
+ load_all_metadata();
+
+ // Preload config keys (`get` for plugins is to be a fast local
+ // operation, we we don't have to synchronize these later because
+ // all sets will come via mgr)
+ load_config();
+
+ // Start Objecter and wait for OSD map
+ objecter->start();
+ objecter->wait_for_osd_map();
+ timer.init();
+
+ monc->sub_want("mgrdigest", 0, 0);
+
+ // Prepare to receive FSMap and request it
+ dout(4) << "requesting FSMap..." << dendl;
+ C_SaferCond cond;
+ waiting_for_fs_map = &cond;
+ monc->sub_want("fsmap", 0, 0);
+ monc->renew_subs();
+
+ // Wait for FSMap
+ dout(4) << "waiting for FSMap..." << dendl;
+ lock.Unlock();
+ cond.wait();
+ lock.Lock();
+ waiting_for_fs_map = nullptr;
+ dout(4) << "Got FSMap." << dendl;
+
+ // Wait for MgrDigest...?
+ // TODO
+
+ finisher.start();
+
+ dout(4) << "Complete." << dendl;
+ return 0;
+}
+
+void Mgr::load_all_metadata()
+{
+ assert(lock.is_locked_by_me());
+
+ JSONCommand mds_cmd;
+ mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
+ JSONCommand osd_cmd;
+ osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
+ JSONCommand mon_cmd;
+ mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
+
+ mds_cmd.wait();
+ osd_cmd.wait();
+ mon_cmd.wait();
+
+ assert(mds_cmd.r == 0);
+ assert(mon_cmd.r == 0);
+ assert(osd_cmd.r == 0);
+
+ for (auto &metadata_val : mds_cmd.json_result.get_array()) {
+ json_spirit::mObject daemon_meta = metadata_val.get_obj();
+ if (daemon_meta.count("hostname") == 0) {
+ dout(1) << "Skipping incomplete metadata entry" << dendl;
+ continue;
+ }
+
+ DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+ dm->key = DaemonKey(CEPH_ENTITY_TYPE_MDS,
+ daemon_meta.at("name").get_str());
+ dm->hostname = daemon_meta.at("hostname").get_str();
+
+ daemon_meta.erase("name");
+ daemon_meta.erase("hostname");
+
+ for (const auto &i : daemon_meta) {
+ dm->metadata[i.first] = i.second.get_str();
+ }
+
+ daemon_state.insert(dm);
+ }
+
+ for (auto &metadata_val : mon_cmd.json_result.get_array()) {
+ json_spirit::mObject daemon_meta = metadata_val.get_obj();
+ if (daemon_meta.count("hostname") == 0) {
+ dout(1) << "Skipping incomplete metadata entry" << dendl;
+ continue;
+ }
+
+ DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+ dm->key = DaemonKey(CEPH_ENTITY_TYPE_MON,
+ daemon_meta.at("name").get_str());
+ dm->hostname = daemon_meta.at("hostname").get_str();
+
+ daemon_meta.erase("name");
+ daemon_meta.erase("hostname");
+
+ for (const auto &i : daemon_meta) {
+ dm->metadata[i.first] = i.second.get_str();
+ }
+
+ daemon_state.insert(dm);
+ }
+
+ for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
+ json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
+ if (osd_metadata.count("hostname") == 0) {
+ dout(1) << "Skipping incomplete metadata entry" << dendl;
+ continue;
+ }
+ dout(4) << osd_metadata.at("hostname").get_str() << dendl;
+
+ DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+ dm->key = DaemonKey(CEPH_ENTITY_TYPE_OSD,
+ stringify(osd_metadata.at("id").get_int()));
+ dm->hostname = osd_metadata.at("hostname").get_str();
+
+ osd_metadata.erase("id");
+ osd_metadata.erase("hostname");
+
+ for (const auto &i : osd_metadata) {
+ dm->metadata[i.first] = i.second.get_str();
+ }
+
+ daemon_state.insert(dm);
+ }
+}
+
+void Mgr::load_config()
+{
+ assert(lock.is_locked_by_me());
+
+ dout(10) << "listing keys" << dendl;
+ JSONCommand cmd;
+ cmd.run(monc, "{\"prefix\": \"config-key list\"}");
+
+ cmd.wait();
+ assert(cmd.r == 0);
+
+ std::map<std::string, std::string> loaded;
+
+ for (auto &key_str : cmd.json_result.get_array()) {
+ std::string const key = key_str.get_str();
+ dout(20) << "saw key '" << key << "'" << dendl;
+
+ const std::string config_prefix = PyModules::config_prefix;
+
+ if (key.substr(0, config_prefix.size()) == config_prefix) {
+ dout(20) << "fetching '" << key << "'" << dendl;
+ Command get_cmd;
+ std::ostringstream cmd_json;
+ cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
+ get_cmd.run(monc, cmd_json.str());
+ get_cmd.wait();
+ assert(get_cmd.r == 0);
+
+ loaded[key] = get_cmd.outbl.to_str();
+ }
+ }
+
+ py_modules.insert_config(loaded);
+}
+
+
+void Mgr::shutdown()
+{
+ // First stop the server so that we're not taking any more incoming requests
+ server.shutdown();
+
+ // Then stop the finisher to ensure its enqueued contexts aren't going
+ // to touch references to the things we're about to tear down
+ finisher.stop();
+
+ lock.Lock();
+ timer.shutdown();
+ objecter->shutdown();
+ lock.Unlock();
+
+ monc->shutdown();
+ client_messenger->shutdown();
+ client_messenger->wait();
+}
+
+void Mgr::handle_osd_map()
+{
+ assert(lock.is_locked_by_me());
+
+ std::set<std::string> names_exist;
+
+ /**
+ * When we see a new OSD map, inspect the entity addrs to
+ * see if they have changed (service restart), and if so
+ * reload the metadata.
+ */
+ objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
+ for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) {
+ if (!osd_map.exists(osd_id)) {
+ continue;
+ }
+
+ // Remember which OSDs exist so that we can cull any that don't
+ names_exist.insert(stringify(osd_id));
+
+ // Consider whether to update the daemon metadata (new/restarted daemon)
+ bool update_meta = false;
+ const auto k = DaemonKey(CEPH_ENTITY_TYPE_OSD, stringify(osd_id));
+ if (daemon_state.is_updating(k)) {
+ continue;
+ }
+
+ if (daemon_state.exists(k)) {
+ auto metadata = daemon_state.get(k);
+ auto addr_iter = metadata->metadata.find("front_addr");
+ if (addr_iter != metadata->metadata.end()) {
+ const std::string &metadata_addr = addr_iter->second;
+ const auto &map_addr = osd_map.get_addr(osd_id);
+
+ if (metadata_addr != stringify(map_addr)) {
+ dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
+ << " != " << stringify(map_addr) << dendl;
+ update_meta = true;
+ } else {
+ dout(20) << "OSD[" << osd_id << "] addr unchanged: "
+ << metadata_addr << dendl;
+ }
+ } else {
+ // Awkward case where daemon went into DaemonState because it
+ // sent us a report but its metadata didn't get loaded yet
+ update_meta = true;
+ }
+ } else {
+ update_meta = true;
+ }
+
+ if (update_meta) {
+ daemon_state.notify_updating(k);
+ auto c = new MetadataUpdate(daemon_state, k);
+ std::ostringstream cmd;
+ cmd << "{\"prefix\": \"osd metadata\", \"id\": "
+ << osd_id << "}";
+ int r = monc->start_mon_command(
+ {cmd.str()},
+ {}, &c->outbl, &c->outs, c);
+ assert(r == 0); // start_mon_command defined to not fail
+ }
+ }
+ });
+
+ // TODO: same culling for MonMap and FSMap
+ daemon_state.cull(CEPH_ENTITY_TYPE_OSD, names_exist);
+}
+
+bool Mgr::ms_dispatch(Message *m)
+{
+ derr << *m << dendl;
+ Mutex::Locker l(lock);
+
+ switch (m->get_type()) {
+ case MSG_MGR_DIGEST:
+ handle_mgr_digest(static_cast<MMgrDigest*>(m));
+ break;
+ case CEPH_MSG_MON_MAP:
+ // FIXME: we probably never get called here because MonClient
+ // has consumed the message. For consuming OSDMap we need
+ // to be the tail dispatcher, but to see MonMap we would
+ // need to be at the head.
+ // Result is that ClusterState has access to monmap (it reads
+ // from monclient anyway), but we don't see notifications. Hook
+ // into MonClient to get notifications instead of messing
+ // with message delivery to achieve it?
+ assert(0);
+
+ py_modules.notify_all("mon_map", "");
+ break;
+ case CEPH_MSG_FS_MAP:
+ py_modules.notify_all("fs_map", "");
+ handle_fs_map((MFSMap*)m);
+ m->put();
+ break;
+ case CEPH_MSG_OSD_MAP:
+
+ handle_osd_map();
+
+ py_modules.notify_all("osd_map", "");
+
+ // Continuous subscribe, so that we can generate notifications
+ // for our MgrPyModules
+ objecter->maybe_request_map();
+ m->put();
+ break;
+
+ default:
+ return false;
+ }
+ return true;
+}
+
+
+void Mgr::handle_fs_map(MFSMap* m)
+{
+ assert(lock.is_locked_by_me());
+
+ const FSMap &new_fsmap = m->get_fsmap();
+
+ if (waiting_for_fs_map) {
+ waiting_for_fs_map->complete(0);
+ waiting_for_fs_map = NULL;
+ }
+
+ // TODO: callers (e.g. from python land) are potentially going to see
+ // the new fsmap before we've bothered populating all the resulting
+ // daemon_state. Maybe we should block python land while we're making
+ // this kind of update?
+
+ cluster_state.set_fsmap(new_fsmap);
+
+ auto mds_info = new_fsmap.get_mds_info();
+ for (const auto &i : mds_info) {
+ const auto &info = i.second;
+
+ const auto k = DaemonKey(CEPH_ENTITY_TYPE_MDS, info.name);
+ if (daemon_state.is_updating(k)) {
+ continue;
+ }
+
+ bool update = false;
+ if (daemon_state.exists(k)) {
+ auto metadata = daemon_state.get(k);
+ // FIXME: nothing stopping old daemons being here, they won't have
+ // addr: need to handle case of pre-ceph-mgr daemons that don't have
+ // the fields we expect
+ auto metadata_addr = metadata->metadata.at("addr");
+ const auto map_addr = info.addr;
+
+ if (metadata_addr != stringify(map_addr)) {
+ dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
+ << " != " << stringify(map_addr) << dendl;
+ update = true;
+ } else {
+ dout(20) << "MDS[" << info.name << "] addr unchanged: "
+ << metadata_addr << dendl;
+ }
+ } else {
+ update = true;
+ }
+
+ if (update) {
+ daemon_state.notify_updating(k);
+ auto c = new MetadataUpdate(daemon_state, k);
+ std::ostringstream cmd;
+ cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
+ << info.name << "\"}";
+ int r = monc->start_mon_command(
+ {cmd.str()},
+ {}, &c->outbl, &c->outs, c);
+ assert(r == 0); // start_mon_command defined to not fail
+ }
+ }
+}
+
+
+bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+ bool force_new)
+{
+ if (dest_type == CEPH_ENTITY_TYPE_MON)
+ return true;
+
+ if (force_new) {
+ if (monc->wait_auth_rotating(10) < 0)
+ return false;
+ }
+
+ *authorizer = monc->auth->build_authorizer(dest_type);
+ return *authorizer != NULL;
+}
+
+
+int Mgr::main(vector<const char *> args)
+{
+ return py_modules.main(args);
+}
+
+
+void Mgr::handle_mgr_digest(MMgrDigest* m)
+{
+ dout(10) << m->mon_status_json.length() << dendl;
+ dout(10) << m->health_json.length() << dendl;
+ cluster_state.load_digest(m);
+ py_modules.notify_all("mon_status", "");
+ py_modules.notify_all("health", "");
+
+ // Hack: use this as a tick/opportunity to prompt python-land that
+ // the pgmap might have changed since last time we were here.
+ py_modules.notify_all("pg_summary", "");
+
+ m->put();
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef CEPH_PYFOO_H_
+#define CEPH_PYFOO_H_
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+// Python's pyconfig-64.h conflicts with ceph's acconfig.h
+#undef HAVE_SYS_WAIT_H
+#undef HAVE_UNISTD_H
+#undef HAVE_UTIME_H
+#undef _POSIX_C_SOURCE
+#undef _XOPEN_SOURCE
+
+#include "osdc/Objecter.h"
+#include "mds/FSMap.h"
+#include "messages/MFSMap.h"
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+#include "auth/Auth.h"
+#include "common/Finisher.h"
+#include "common/Timer.h"
+
+#include "DaemonServer.h"
+#include "PyModules.h"
+
+#include "DaemonMetadata.h"
+#include "ClusterState.h"
+
+class MCommand;
+class MMgrDigest;
+
+
+class MgrPyModule;
+
+class Mgr : public Dispatcher {
+protected:
+ Objecter *objecter;
+ MonClient *monc;
+ Messenger *client_messenger;
+
+ Mutex lock;
+ SafeTimer timer;
+ Finisher finisher;
+
+ Context *waiting_for_fs_map;
+
+ PyModules py_modules;
+ DaemonMetadataIndex daemon_state;
+ ClusterState cluster_state;
+
+ DaemonServer server;
+
+ void load_config();
+ void load_all_metadata();
+
+public:
+ Mgr();
+ ~Mgr();
+
+ void handle_mgr_digest(MMgrDigest* m);
+ void handle_fs_map(MFSMap* m);
+ void handle_osd_map();
+ bool ms_dispatch(Message *m);
+ bool ms_handle_reset(Connection *con) { return false; }
+ void ms_handle_remote_reset(Connection *con) {}
+ bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+ bool force_new);
+ int init();
+ void shutdown();
+ void usage() {}
+ int main(vector<const char *> args);
+};
+
+#endif /* MDS_UTILITY_H_ */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef MGR_CONTEXT_H_
+#define MGR_CONTEXT_H_
+
+#include <memory>
+#include "include/Context.h"
+
+#include "common/ceph_json.h"
+#include "mon/MonClient.h"
+
+class C_StdFunction : public Context
+{
+private:
+ std::function<void()> fn;
+
+public:
+ C_StdFunction(std::function<void()> fn_)
+ : fn(fn_)
+ {}
+
+ void finish(int r)
+ {
+ fn();
+ }
+};
+
+class Command
+{
+protected:
+ C_SaferCond cond;
+public:
+ bufferlist outbl;
+ std::string outs;
+ int r;
+
+ void run(MonClient *monc, const std::string &command)
+ {
+ monc->start_mon_command({command}, {},
+ &outbl, &outs, &cond);
+ }
+
+ virtual void wait()
+ {
+ r = cond.wait();
+ }
+
+ virtual ~Command() {}
+};
+
+
+class JSONCommand : public Command
+{
+public:
+ json_spirit::mValue json_result;
+
+ void wait()
+ {
+ Command::wait();
+
+ if (r == 0) {
+ bool read_ok = json_spirit::read(
+ outbl.to_str(), json_result);
+ if (!read_ok) {
+ r = -EINVAL;
+ }
+ }
+ }
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+
+#include "PyFormatter.h"
+
+#include "common/debug.h"
+
+#include "MgrPyModule.h"
+
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+MgrPyModule::MgrPyModule(const std::string &module_name_)
+ : module_name(module_name_), pModule(nullptr), pClass(nullptr),
+ pClassInstance(nullptr)
+{}
+
+MgrPyModule::~MgrPyModule()
+{
+ Py_XDECREF(pModule);
+ Py_XDECREF(pClass);
+ Py_XDECREF(pClassInstance);
+}
+
+int MgrPyModule::load()
+{
+ // Load the module
+ PyObject *pName = PyString_FromString(module_name.c_str());
+ pModule = PyImport_Import(pName);
+ Py_DECREF(pName);
+ if (pModule == nullptr) {
+ derr << "Module not found: '" << module_name << "'" << dendl;
+ return -ENOENT;
+ }
+
+ // Find the class
+ // TODO: let them call it what they want instead of just 'Module'
+ pClass = PyObject_GetAttrString(pModule, (const char*)"Module");
+ if (pClass == nullptr) {
+ derr << "Class not found in module '" << module_name << "'" << dendl;
+ return -EINVAL;
+ }
+
+
+ // Just using the module name as the handle, replace with a
+ // uuidish thing if needed
+ auto pyHandle = PyString_FromString(module_name.c_str());
+ auto pArgs = PyTuple_Pack(1, pyHandle);
+ pClassInstance = PyObject_CallObject(pClass, pArgs);
+ if (pClassInstance == nullptr) {
+ derr << "Failed to construct class in '" << module_name << "'" << dendl;
+ return -EINVAL;
+ } else {
+ dout(1) << "Constructed class from module: " << module_name << dendl;
+ }
+ Py_DECREF(pArgs);
+
+ return load_commands();
+}
+
+int MgrPyModule::serve()
+{
+ assert(pClassInstance != nullptr);
+
+ PyGILState_STATE gstate;
+ gstate = PyGILState_Ensure();
+
+ auto pValue = PyObject_CallMethod(pClassInstance,
+ (const char*)"serve", (const char*)"()");
+
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ PyErr_Print();
+ return -EINVAL;
+ }
+
+ PyGILState_Release(gstate);
+
+ return 0;
+}
+
+void MgrPyModule::notify(const std::string ¬ify_type, const std::string ¬ify_id)
+{
+ assert(pClassInstance != nullptr);
+
+ PyGILState_STATE gstate;
+ gstate = PyGILState_Ensure();
+
+ // Execute
+ auto pValue = PyObject_CallMethod(pClassInstance, (const char*)"notify", "(ss)",
+ notify_type.c_str(), notify_id.c_str());
+
+ if (pValue != NULL) {
+ Py_DECREF(pValue);
+ } else {
+ PyErr_Print();
+ // FIXME: callers can't be expected to handle a python module
+ // that has spontaneously broken, but Mgr() should provide
+ // a hook to unload misbehaving modules when they have an
+ // error somewhere like this
+ }
+
+ PyGILState_Release(gstate);
+}
+
+int MgrPyModule::load_commands()
+{
+ PyGILState_STATE gstate;
+ gstate = PyGILState_Ensure();
+
+ PyObject *command_list = PyObject_GetAttrString(pClassInstance, "COMMANDS");
+ assert(command_list != nullptr);
+ const size_t list_size = PyList_Size(command_list);
+ for (size_t i = 0; i < list_size; ++i) {
+ PyObject *command = PyList_GetItem(command_list, i);
+ assert(command != nullptr);
+
+ ModuleCommand item;
+
+ PyObject *pCmd = PyDict_GetItemString(command, "cmd");
+ assert(pCmd != nullptr);
+ item.cmdstring = PyString_AsString(pCmd);
+
+ dout(20) << "loaded command " << item.cmdstring << dendl;
+
+ PyObject *pDesc = PyDict_GetItemString(command, "desc");
+ assert(pDesc != nullptr);
+ item.helpstring = PyString_AsString(pDesc);
+
+ PyObject *pPerm = PyDict_GetItemString(command, "perm");
+ assert(pPerm != nullptr);
+ item.perm = PyString_AsString(pPerm);
+
+ item.handler = this;
+
+ commands.push_back(item);
+ }
+ Py_DECREF(command_list);
+
+ PyGILState_Release(gstate);
+
+ dout(10) << "loaded " << commands.size() << " commands" << dendl;
+
+ return 0;
+}
+
+int MgrPyModule::handle_command(
+ const cmdmap_t &cmdmap,
+ std::stringstream *ss,
+ std::stringstream *ds)
+{
+ assert(ss != nullptr);
+ assert(ds != nullptr);
+
+ PyGILState_STATE gstate;
+ gstate = PyGILState_Ensure();
+
+ PyFormatter f;
+ cmdmap_dump(cmdmap, &f);
+ PyObject *py_cmd = f.get();
+
+ auto pResult = PyObject_CallMethod(pClassInstance,
+ (const char*)"handle_command", (const char*)"(O)", py_cmd);
+
+ Py_DECREF(py_cmd);
+
+ int r = 0;
+ if (pResult != NULL) {
+ if (PyTuple_Size(pResult) != 3) {
+ r = -EINVAL;
+ } else {
+ r = PyInt_AsLong(PyTuple_GetItem(pResult, 0));
+ *ds << PyString_AsString(PyTuple_GetItem(pResult, 1));
+ *ss << PyString_AsString(PyTuple_GetItem(pResult, 2));
+ }
+
+ Py_DECREF(pResult);
+ } else {
+ PyErr_Print();
+ r = -EINVAL;
+ }
+
+ PyGILState_Release(gstate);
+
+ return r;
+}
+
--- /dev/null
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+
+#ifndef MGR_PY_MODULE_H_
+#define MGR_PY_MODULE_H_
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+
+#include "common/cmdparse.h"
+
+#include <vector>
+#include <string>
+
+
+class MgrPyModule;
+
+/**
+ * A Ceph CLI command description provided from a Python module
+ */
+class ModuleCommand {
+public:
+ std::string cmdstring;
+ std::string helpstring;
+ std::string perm;
+ MgrPyModule *handler;
+};
+
+class MgrPyModule
+{
+private:
+ const std::string module_name;
+ PyObject *pModule;
+ PyObject *pClass;
+ PyObject *pClassInstance;
+
+
+ std::vector<ModuleCommand> commands;
+
+ int load_commands();
+
+public:
+ MgrPyModule(const std::string &module_name);
+ ~MgrPyModule();
+
+ int load();
+ int serve();
+ void notify(const std::string ¬ify_type, const std::string ¬ify_id);
+
+ const std::vector<ModuleCommand> &get_commands() const
+ {
+ return commands;
+ }
+
+ int handle_command(
+ const cmdmap_t &cmdmap,
+ std::stringstream *ss,
+ std::stringstream *ds);
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat Inc
+ *
+ * Author: John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include "PyFormatter.h"
+
+
+void PyFormatter::open_array_section(const char *name)
+{
+ PyObject *list = PyList_New(0);
+ dump_pyobject(name, list);
+ stack.push(cursor);
+ cursor = list;
+}
+
+void PyFormatter::open_object_section(const char *name)
+{
+ PyObject *dict = PyDict_New();
+ dump_pyobject(name, dict);
+ stack.push(cursor);
+ cursor = dict;
+}
+
+void PyFormatter::dump_unsigned(const char *name, uint64_t u)
+{
+ PyObject *p = PyLong_FromLongLong(u);
+ assert(p);
+ dump_pyobject(name, p);
+}
+
+void PyFormatter::dump_int(const char *name, int64_t u)
+{
+ PyObject *p = PyLong_FromLongLong(u);
+ assert(p);
+ dump_pyobject(name, p);
+}
+
+void PyFormatter::dump_float(const char *name, double d)
+{
+ dump_pyobject(name, PyFloat_FromDouble(d));
+}
+
+void PyFormatter::dump_string(const char *name, const std::string& s)
+{
+ dump_pyobject(name, PyString_FromString(s.c_str()));
+}
+
+void PyFormatter::dump_bool(const char *name, bool b)
+{
+ if (b) {
+ Py_INCREF(Py_True);
+ dump_pyobject(name, Py_True);
+ } else {
+ Py_INCREF(Py_False);
+ dump_pyobject(name, Py_False);
+ }
+}
+
+std::ostream& PyFormatter::dump_stream(const char *name)
+{
+ // Give the caller an ostream, construct a PyString,
+ // and remember the association between the two. On flush,
+ // we'll read from the ostream into the PyString
+ auto ps = std::make_shared<PendingStream>();
+ ps->cursor = cursor;
+ ps->name = name;
+
+ pending_streams.push_back(ps);
+
+ return ps->stream;
+}
+
+void PyFormatter::dump_format_va(const char *name, const char *ns, bool quoted, const char *fmt, va_list ap)
+{
+ // TODO
+ assert(0);
+}
+
+/**
+ * Steals reference to `p`
+ */
+void PyFormatter::dump_pyobject(const char *name, PyObject *p)
+{
+ if (PyList_Check(cursor)) {
+ PyList_Append(cursor, p);
+ Py_DECREF(p);
+ } else if (PyDict_Check(cursor)) {
+ PyObject *key = PyString_FromString(name);
+ PyDict_SetItem(cursor, key, p);
+ Py_DECREF(key);
+ Py_DECREF(p);
+ } else {
+ assert(0);
+ }
+}
+
+void PyFormatter::finish_pending_streams()
+{
+ for (const auto &i : pending_streams) {
+ PyObject *tmp_cur = cursor;
+ cursor = i->cursor;
+ dump_pyobject(
+ i->name.c_str(),
+ PyString_FromString(i->stream.str().c_str()));
+ cursor = tmp_cur;
+ }
+
+ pending_streams.clear();
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat Inc
+ *
+ * Author: John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef PY_FORMATTER_H_
+#define PY_FORMATTER_H_
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+// Python's pyconfig-64.h conflicts with ceph's acconfig.h
+#undef HAVE_SYS_WAIT_H
+#undef HAVE_UNISTD_H
+#undef HAVE_UTIME_H
+#undef _POSIX_C_SOURCE
+#undef _XOPEN_SOURCE
+
+#include <stack>
+#include <memory>
+#include <list>
+
+#include "common/Formatter.h"
+
+class PyFormatter : public ceph::Formatter
+{
+public:
+ PyFormatter(bool pretty = false, bool array = false)
+ {
+ // Initialise cursor to an empty dict
+ if (!array) {
+ root = cursor = PyDict_New();
+ } else {
+ root = cursor = PyList_New(0);
+ }
+ }
+
+ ~PyFormatter()
+ {
+ cursor = NULL;
+ Py_DECREF(root);
+ root = NULL;
+ }
+
+ // Obscure, don't care.
+ void open_array_section_in_ns(const char *name, const char *ns)
+ {assert(0);}
+ void open_object_section_in_ns(const char *name, const char *ns)
+ {assert(0);}
+
+ void reset()
+ {
+ const bool array = PyList_Check(root);
+ Py_DECREF(root);
+ if (array) {
+ root = cursor = PyList_New(0);
+ } else {
+ root = cursor = PyDict_New();
+ }
+ }
+
+ virtual void set_status(int status, const char* status_name) {}
+ virtual void output_header() {};
+ virtual void output_footer() {};
+
+
+ virtual void open_array_section(const char *name);
+ void open_object_section(const char *name);
+ void close_section()
+ {
+ assert(cursor != root);
+ assert(!stack.empty());
+ cursor = stack.top();
+ stack.pop();
+ }
+ void dump_bool(const char *name, bool b);
+ void dump_unsigned(const char *name, uint64_t u);
+ void dump_int(const char *name, int64_t u);
+ void dump_float(const char *name, double d);
+ void dump_string(const char *name, const std::string& s);
+ std::ostream& dump_stream(const char *name);
+ void dump_format_va(const char *name, const char *ns, bool quoted, const char *fmt, va_list ap);
+
+ void flush(std::ostream& os)
+ {
+ // This class is not a serializer: this doens't make sense
+ assert(0);
+ }
+
+ int get_len() const
+ {
+ // This class is not a serializer: this doens't make sense
+ assert(0);
+ return 0;
+ }
+
+ void write_raw_data(const char *data)
+ {
+ // This class is not a serializer: this doens't make sense
+ assert(0);
+ }
+
+ PyObject *get()
+ {
+ finish_pending_streams();
+
+ Py_INCREF(root);
+ return root;
+ }
+
+ void finish_pending_streams();
+
+private:
+ PyObject *root;
+ PyObject *cursor;
+ std::stack<PyObject *> stack;
+
+ void dump_pyobject(const char *name, PyObject *p);
+
+ class PendingStream {
+ public:
+ PyObject *cursor;
+ std::string name;
+ std::stringstream stream;
+ };
+
+ std::list<std::shared_ptr<PendingStream> > pending_streams;
+
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+
+#include "PyState.h"
+#include "PyFormatter.h"
+
+#include "osd/OSDMap.h"
+#include "mon/MonMap.h"
+
+#include "mgr/MgrContext.h"
+
+#include "PyModules.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+void PyModules::dump_server(const std::string &hostname,
+ const DaemonMetadataCollection &dmc,
+ Formatter *f)
+{
+ f->dump_string("hostname", hostname);
+ f->open_array_section("services");
+ std::string ceph_version;
+
+ for (const auto &i : dmc) {
+ const auto &key = i.first;
+ const std::string str_type = ceph_entity_type_name(key.first);
+ const std::string &svc_name = key.second;
+
+ // TODO: pick the highest version, and make sure that
+ // somewhere else (during health reporting?) we are
+ // indicating to the user if we see mixed versions
+ ceph_version = i.second->metadata.at("ceph_version");
+
+ f->open_object_section("service");
+ f->dump_string("type", str_type);
+ f->dump_string("id", svc_name);
+ f->close_section();
+ }
+ f->close_section();
+
+ f->dump_string("ceph_version", ceph_version);
+}
+
+
+
+PyObject *PyModules::get_server_python(const std::string &hostname)
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+ dout(10) << " (" << hostname << ")" << dendl;
+
+ auto dmc = daemon_state.get_by_server(hostname);
+
+ PyFormatter f;
+ dump_server(hostname, dmc, &f);
+ return f.get();
+}
+
+
+PyObject *PyModules::list_servers_python()
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+ dout(10) << " >" << dendl;
+
+ PyFormatter f(false, true);
+ const auto &all = daemon_state.get_all_servers();
+ for (const auto &i : all) {
+ const auto &hostname = i.first;
+
+ f.open_object_section("server");
+ dump_server(hostname, i.second, &f);
+ f.close_section();
+ }
+
+ return f.get();
+}
+
+PyObject *PyModules::get_metadata_python(std::string const &handle,
+ entity_type_t svc_type, const std::string &svc_id)
+{
+ auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
+ PyFormatter f;
+ f.dump_string("hostname", metadata->hostname);
+ for (const auto &i : metadata->metadata) {
+ f.dump_string(i.first.c_str(), i.second);
+ }
+
+ return f.get();
+}
+
+
+PyObject *PyModules::get_python(const std::string &what)
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ if (what == "fs_map") {
+ PyFormatter f;
+ cluster_state.with_fsmap([&f](const FSMap &fsmap) {
+ fsmap.dump(&f);
+ });
+ return f.get();
+ } else if (what == "osdmap_crush_map_text") {
+ bufferlist rdata;
+ cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
+ osd_map.crush->encode(rdata);
+ });
+ std::string crush_text = rdata.to_str();
+ return PyString_FromString(crush_text.c_str());
+ } else if (what.substr(0, 7) == "osd_map") {
+ PyFormatter f;
+ cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
+ if (what == "osd_map") {
+ osd_map.dump(&f);
+ } else if (what == "osd_map_tree") {
+ osd_map.print_tree(&f, nullptr);
+ } else if (what == "osd_map_crush") {
+ osd_map.crush->dump(&f);
+ }
+ });
+ return f.get();
+ } else if (what == "config") {
+ PyFormatter f;
+ g_conf->show_config(&f);
+ return f.get();
+ } else if (what == "mon_map") {
+ PyFormatter f;
+ cluster_state.with_monmap(
+ [&f](const MonMap &monmap) {
+ monmap.dump(&f);
+ }
+ );
+ return f.get();
+ } else if (what == "fs_map") {
+ PyFormatter f;
+ cluster_state.with_fsmap(
+ [&f](const FSMap &fsmap) {
+ fsmap.dump(&f);
+ }
+ );
+ return f.get();
+ } else if (what == "osd_metadata") {
+ PyFormatter f;
+ auto dmc = daemon_state.get_by_type(CEPH_ENTITY_TYPE_OSD);
+ for (const auto &i : dmc) {
+ f.open_object_section(i.first.second.c_str());
+ f.dump_string("hostname", i.second->hostname);
+ for (const auto &j : i.second->metadata) {
+ f.dump_string(j.first.c_str(), j.second);
+ }
+ f.close_section();
+ }
+ return f.get();
+ } else if (what == "pg_summary" || what == "health" || what == "mon_status") {
+ PyFormatter f;
+ bufferlist json;
+ if (what == "pg_summary") {
+ json = cluster_state.get_pg_summary();
+ } else if (what == "health") {
+ json = cluster_state.get_health();
+ } else if (what == "mon_status") {
+ json = cluster_state.get_mon_status();
+ } else {
+ assert(false);
+ }
+ f.dump_string("json", json.to_str());
+ return f.get();
+ } else {
+ derr << "Python module requested unknown data '" << what << "'" << dendl;
+ Py_RETURN_NONE;
+ }
+}
+
+//XXX courtesy of http://stackoverflow.com/questions/1418015/how-to-get-python-exception-text
+#include <boost/python.hpp>
+// decode a Python exception into a string
+std::string handle_pyerror()
+{
+ using namespace boost::python;
+ using namespace boost;
+
+ PyObject *exc,*val,*tb;
+ object formatted_list, formatted;
+ PyErr_Fetch(&exc,&val,&tb);
+ handle<> hexc(exc),hval(allow_null(val)),htb(allow_null(tb));
+ object traceback(import("traceback"));
+ if (!tb) {
+ object format_exception_only(traceback.attr("format_exception_only"));
+ formatted_list = format_exception_only(hexc,hval);
+ } else {
+ object format_exception(traceback.attr("format_exception"));
+ formatted_list = format_exception(hexc,hval,htb);
+ }
+ formatted = str("\n").join(formatted_list);
+ return extract<std::string>(formatted);
+}
+
+int PyModules::main(vector<const char *> args)
+{
+ global_handle = this;
+
+ // Set up global python interpreter
+ Py_Initialize();
+
+ // Some python modules do not cope with an unpopulated argv, so lets
+ // fake one. This step also picks up site-packages into sys.path.
+ const char *argv[] = {"ceph-mgr"};
+ PySys_SetArgv(1, (char**)argv);
+
+ // Populate python namespace with callable hooks
+ Py_InitModule("ceph_state", CephStateMethods);
+
+ // Configure sys.path to include mgr_module_path
+ const std::string module_path = g_conf->mgr_module_path;
+ dout(4) << "Loading modules from '" << module_path << "'" << dendl;
+ std::string sys_path = Py_GetPath();
+
+ // We need site-packages for flask et al, unless we choose to
+ // embed them in the ceph package. site-packages is an interpreter-specific
+ // thing, so as an embedded interpreter we're responsible for picking
+ // this. FIXME: don't hardcode this.
+ std::string site_packages = "/usr/lib/python2.7/site-packages:/usr/lib64/python2.7/site-packages:/usr/lib64/python2.7";
+ sys_path += ":";
+ sys_path += site_packages;
+
+ sys_path += ":";
+ sys_path += module_path;
+ dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
+ PySys_SetPath((char*)(sys_path.c_str()));
+
+ // Let CPython know that we will be calling it back from other
+ // threads in future.
+ if (! PyEval_ThreadsInitialized()) {
+ PyEval_InitThreads();
+ }
+
+ // Load python code
+ // TODO load mgr_modules list, run them all in a thread each.
+ auto mod = new MgrPyModule("rest");
+ int r = mod->load();
+ if (r != 0) {
+ derr << "Error loading python module" << dendl;
+ derr << handle_pyerror() << dendl;
+#if 0
+ PyObject *ptype, *pvalue, *ptraceback;
+ PyErr_Fetch(&ptype, &pvalue, &ptraceback);
+ if (ptype) {
+ if (pvalue) {
+ char *pStrErrorMessage = PyString_AsString(pvalue);
+
+XXX why is pvalue giving null when converted to string?
+
+ assert(pStrErrorMessage != nullptr);
+ derr << "Exception: " << pStrErrorMessage << dendl;
+ Py_DECREF(ptraceback);
+ Py_DECREF(pvalue);
+ }
+ Py_DECREF(ptype);
+ }
+#endif
+
+ // FIXME: be tolerant of bad modules, log an error and continue
+ // to load other, healthy modules.
+ return r;
+ }
+ {
+ Mutex::Locker locker(lock);
+ modules["rest"] = mod;
+ }
+
+ // Execute python server
+ mod->serve();
+
+ {
+ Mutex::Locker locker(lock);
+ // Tear down modules
+ for (auto i : modules) {
+ delete i.second;
+ }
+ modules.clear();
+ }
+
+ Py_Finalize();
+ return 0;
+}
+
+void PyModules::notify_all(const std::string ¬ify_type,
+ const std::string ¬ify_id)
+{
+ Mutex::Locker l(lock);
+
+ dout(10) << __func__ << ": notify_all " << notify_type << dendl;
+ for (auto i : modules) {
+ auto module = i.second;
+ // Send all python calls down a Finisher to avoid blocking
+ // C++ code, and avoid any potential lock cycles.
+ finisher.queue(new C_StdFunction([module, notify_type, notify_id](){
+ module->notify(notify_type, notify_id);
+ }));
+ }
+}
+
+bool PyModules::get_config(const std::string &handle,
+ const std::string &key, std::string *val) const
+{
+ const std::string global_key = config_prefix + handle + "." + key;
+
+ if (config_cache.count(global_key)) {
+ *val = config_cache.at(global_key);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void PyModules::set_config(const std::string &handle,
+ const std::string &key, const std::string &val)
+{
+ const std::string global_key = config_prefix + handle + "." + key;
+
+ config_cache[global_key] = val;
+
+ std::ostringstream cmd_json;
+ Command set_cmd;
+
+ JSONFormatter jf;
+ jf.open_object_section("cmd");
+ jf.dump_string("prefix", "config-key put");
+ jf.dump_string("key", global_key);
+ jf.dump_string("val", val);
+ jf.close_section();
+ jf.flush(cmd_json);
+
+ set_cmd.run(&monc, cmd_json.str());
+ set_cmd.wait();
+
+ // FIXME: is config-key put ever allowed to fail?
+ assert(set_cmd.r == 0);
+}
+
+std::vector<ModuleCommand> PyModules::get_commands()
+{
+ Mutex::Locker l(lock);
+
+ std::vector<ModuleCommand> result;
+ for (auto i : modules) {
+ auto module = i.second;
+ auto mod_commands = module->get_commands();
+ for (auto j : mod_commands) {
+ result.push_back(j);
+ }
+ }
+
+ return result;
+}
+
+void PyModules::insert_config(const std::map<std::string,
+ std::string> &new_config)
+{
+ dout(4) << "Loaded " << new_config.size() << " config settings" << dendl;
+ config_cache = new_config;
+}
+
+void PyModules::log(const std::string &handle,
+ unsigned level, const std::string &record)
+{
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr[" << handle << "] "
+ dout(level) << record << dendl;
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef PY_MODULES_H_
+#define PY_MODULES_H_
+
+#include "MgrPyModule.h"
+
+#include "common/Finisher.h"
+#include "common/Mutex.h"
+
+
+#include "DaemonMetadata.h"
+#include "ClusterState.h"
+
+
+
+
+
+class PyModules
+{
+ protected:
+ std::map<std::string, MgrPyModule*> modules;
+
+ DaemonMetadataIndex &daemon_state;
+ ClusterState &cluster_state;
+ MonClient &monc;
+ Finisher &finisher;
+
+ Mutex lock;
+
+public:
+ static constexpr auto config_prefix = "mgr.";
+
+ PyModules(DaemonMetadataIndex &ds, ClusterState &cs, MonClient &mc,
+ Finisher &f)
+ : daemon_state(ds), cluster_state(cs), monc(mc), finisher(f),
+ lock("PyModules")
+ {
+ }
+
+ // FIXME: wrap for send_command?
+ MonClient &get_monc() {return monc;}
+
+ PyObject *get_python(const std::string &what);
+ PyObject *get_server_python(const std::string &hostname);
+ PyObject *list_servers_python();
+ PyObject *get_metadata_python(std::string const &handle,
+ entity_type_t svc_type, const std::string &svc_id);
+
+ std::map<std::string, std::string> config_cache;
+
+ std::vector<ModuleCommand> get_commands();
+
+ void insert_config(const std::map<std::string, std::string> &new_config);
+
+ // Public so that MonCommandCompletion can use it
+ // FIXME: bit weird that we're sending command completions
+ // to all modules (we rely on them to ignore anything that
+ // they don't recognise), but when we get called from
+ // python-land we don't actually know who we are. Need
+ // to give python-land a handle in initialisation.
+ void notify_all(const std::string ¬ify_type,
+ const std::string ¬ify_id);
+
+ int main(std::vector<const char *> args);
+
+ void dump_server(const std::string &hostname,
+ const DaemonMetadataCollection &dmc,
+ Formatter *f);
+
+ bool get_config(const std::string &handle,
+ const std::string &key, std::string *val) const;
+ void set_config(const std::string &handle,
+ const std::string &key, const std::string &val);
+
+ void log(const std::string &handle,
+ unsigned level, const std::string &record);
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+/**
+ * The interface we present to python code that runs within
+ * ceph-mgr.
+ */
+
+#include "Mgr.h"
+
+#include "mon/MonClient.h"
+
+#include "PyState.h"
+
+PyModules *global_handle = NULL;
+
+
+class MonCommandCompletion : public Context
+{
+ PyObject *python_completion;
+ const std::string tag;
+
+public:
+ std::string outs;
+ bufferlist outbl;
+
+ MonCommandCompletion(PyObject* ev, const std::string &tag_)
+ : python_completion(ev), tag(tag_)
+ {
+ assert(python_completion != nullptr);
+ Py_INCREF(python_completion);
+ }
+
+ ~MonCommandCompletion()
+ {
+ Py_DECREF(python_completion);
+ }
+
+ void finish(int r)
+ {
+ PyGILState_STATE gstate;
+ gstate = PyGILState_Ensure();
+
+ auto set_fn = PyObject_GetAttrString(python_completion, "complete");
+ assert(set_fn != nullptr);
+
+ auto pyR = PyInt_FromLong(r);
+ auto pyOutBl = PyString_FromString(outbl.to_str().c_str());
+ auto pyOutS = PyString_FromString(outs.c_str());
+ auto args = PyTuple_Pack(3, pyR, pyOutBl, pyOutS);
+ Py_DECREF(pyR);
+ Py_DECREF(pyOutBl);
+ Py_DECREF(pyOutS);
+
+ auto rtn = PyObject_CallObject(set_fn, args);
+ if (rtn != nullptr) {
+ Py_DECREF(rtn);
+ }
+ Py_DECREF(args);
+
+ PyGILState_Release(gstate);
+
+ global_handle->notify_all("command", tag);
+ }
+};
+
+
+static PyObject*
+ceph_send_command(PyObject *self, PyObject *args)
+{
+ char *handle = nullptr;
+ char *cmd_json = nullptr;
+ char *tag = nullptr;
+ PyObject *completion = nullptr;
+ if (!PyArg_ParseTuple(args, "sOss:ceph_send_command",
+ &handle, &completion, &cmd_json, &tag)) {
+ return nullptr;
+ }
+
+ auto set_fn = PyObject_GetAttrString(completion, "complete");
+ if (set_fn == nullptr) {
+ assert(0); // TODO raise python exception instead
+ } else {
+ assert(PyCallable_Check(set_fn));
+ }
+ Py_DECREF(set_fn);
+
+ auto c = new MonCommandCompletion(completion, tag);
+ auto r = global_handle->get_monc().start_mon_command(
+ {cmd_json},
+ {},
+ &c->outbl,
+ &c->outs,
+ c);
+ assert(r == 0); // start_mon_command is forbidden to fail
+
+ Py_RETURN_NONE;
+}
+
+
+static PyObject*
+ceph_state_get(PyObject *self, PyObject *args)
+{
+ char *handle = nullptr;
+ char *what = NULL;
+ if (!PyArg_ParseTuple(args, "ss:ceph_state_get", &handle, &what)) {
+ return NULL;
+ }
+
+ return global_handle->get_python(what);
+}
+
+
+static PyObject*
+ceph_get_server(PyObject *self, PyObject *args)
+{
+ char *handle = nullptr;
+ char *hostname = NULL;
+ if (!PyArg_ParseTuple(args, "sz:ceph_get_server", &handle, &hostname)) {
+ return NULL;
+ }
+
+ if (hostname) {
+ return global_handle->get_server_python(hostname);
+ } else {
+ return global_handle->list_servers_python();
+ }
+}
+
+static PyObject*
+ceph_config_get(PyObject *self, PyObject *args)
+{
+ char *handle = nullptr;
+ char *what = nullptr;
+ if (!PyArg_ParseTuple(args, "ss:ceph_config_get", &handle, &what)) {
+ derr << "Invalid args!" << dendl;
+ return nullptr;
+ }
+
+ std::string value;
+ bool found = global_handle->get_config(handle, what, &value);
+ if (found) {
+ derr << "Found" << dendl;
+ return PyString_FromString(value.c_str());
+ } else {
+ derr << "Not found" << dendl;
+ Py_RETURN_NONE;
+ }
+}
+
+static PyObject*
+ceph_config_set(PyObject *self, PyObject *args)
+{
+ char *handle = nullptr;
+ char *key = nullptr;
+ char *value = nullptr;
+ if (!PyArg_ParseTuple(args, "sss:ceph_config_set", &handle, &key, &value)) {
+ return nullptr;
+ }
+
+ global_handle->set_config(handle, key, value);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject*
+get_metadata(PyObject *self, PyObject *args)
+{
+ char *handle = nullptr;
+ char *type_str = NULL;
+ char *svc_id = NULL;
+ if (!PyArg_ParseTuple(args, "sss:get_metadata", &handle, &type_str, &svc_id)) {
+ return nullptr;
+ }
+
+ entity_type_t svc_type;
+ if (type_str == std::string("mds")) {
+ svc_type = CEPH_ENTITY_TYPE_MDS;
+ } else if (type_str == std::string("osd")) {
+ svc_type = CEPH_ENTITY_TYPE_OSD;
+ } else if (type_str == std::string("mon")) {
+ svc_type = CEPH_ENTITY_TYPE_MON;
+ } else {
+ // FIXME: form a proper exception
+ return nullptr;
+ }
+
+ return global_handle->get_metadata_python(handle, svc_type, svc_id);
+}
+
+static PyObject*
+ceph_log(PyObject *self, PyObject *args)
+{
+ int level = 0;
+ char *record = nullptr;
+ char *handle = nullptr;
+ if (!PyArg_ParseTuple(args, "sis:log", &handle, &level, &record)) {
+ return nullptr;
+ }
+
+ global_handle->log(handle, level, record);
+
+ Py_RETURN_NONE;
+}
+
+
+
+PyMethodDef CephStateMethods[] = {
+ {"get", ceph_state_get, METH_VARARGS,
+ "Get a cluster object"},
+ {"get_server", ceph_get_server, METH_VARARGS,
+ "Get a server object"},
+ {"get_metadata", get_metadata, METH_VARARGS,
+ "Get a service's metadata"},
+ {"send_command", ceph_send_command, METH_VARARGS,
+ "Send a mon command"},
+ {"get_config", ceph_config_get, METH_VARARGS,
+ "Get a configuration value"},
+ {"set_config", ceph_config_set, METH_VARARGS,
+ "Set a configuration value"},
+ {"log", ceph_log, METH_VARARGS,
+ "Emit a (local) log message"},
+ {NULL, NULL, 0, NULL}
+};
+
--- /dev/null
+#ifndef PYSTATE_H_
+#define PYSTATE_H_
+
+#include "Python.h"
+
+class PyModules;
+
+extern PyModules *global_handle;
+extern PyMethodDef CephStateMethods[];
+
+#endif
+