From: John Spray Date: Thu, 30 Jun 2016 13:05:02 +0000 (+0100) Subject: mgr: create ceph-mgr service X-Git-Tag: v11.0.1~60^2~59 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ac30e6cee2b2d3815438f1a392a951d511bddfd4;p=ceph.git mgr: create ceph-mgr service Signed-off-by: John Spray --- diff --git a/src/ceph_mgr.cc b/src/ceph_mgr.cc new file mode 100644 index 000000000000..4a0fc758b7a9 --- /dev/null +++ b/src/ceph_mgr.cc @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Red Hat Inc + * + * Author: John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "mgr/Mgr.h" + +#include "include/types.h" +#include "common/config.h" +#include "common/ceph_argparse.h" +#include "common/errno.h" +#include "global/global_init.h" + + +int main(int argc, const char **argv) +{ + vector args; + argv_to_vec(argc, argv, args); + env_to_vec(args); + + global_init(NULL, args, CEPH_ENTITY_TYPE_MGR, CODE_ENVIRONMENT_DAEMON, 0, + "mgr_data"); + common_init_finish(g_ceph_context); + // For consumption by KeyRing::from_ceph_context in MonClient + g_conf->set_val("keyring", "$mgr_data/keyring", false); + + Mgr mgr; + + // Handle --help before calling init() so we don't depend on network. + if ((args.size() == 1 && (std::string(args[0]) == "--help" || std::string(args[0]) == "-h"))) { + mgr.usage(); + return 0; + } + + global_init_daemonize(g_ceph_context); + global_init_chdir(g_ceph_context); + common_init_finish(g_ceph_context); + + // Connect to mon cluster, download MDS map etc + int rc = mgr.init(); + if (rc != 0) { + std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl; + return rc; + } + + // Finally, execute the user's commands + rc = mgr.main(args); + if (rc != 0) { + std::cerr << "Error (" << cpp_strerror(rc) << ")" << std::endl; + } + + mgr.shutdown(); + + return rc; +} + diff --git a/src/mgr/ClusterState.cc b/src/mgr/ClusterState.cc new file mode 100644 index 000000000000..1e16810799f8 --- /dev/null +++ b/src/mgr/ClusterState.cc @@ -0,0 +1,43 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "messages/MMgrDigest.h" + +#include "mgr/ClusterState.h" + + +ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_) + : monc(monc_), objecter(objecter_), lock("ClusterState") +{} + +void ClusterState::set_objecter(Objecter *objecter_) +{ + Mutex::Locker l(lock); + + objecter = objecter_; +} + +void ClusterState::set_fsmap(FSMap const &new_fsmap) +{ + Mutex::Locker l(lock); + + fsmap = new_fsmap; +} + +void ClusterState::load_digest(MMgrDigest *m) +{ + pg_summary_json = std::move(m->pg_summary_json); + health_json = std::move(m->health_json); + mon_status_json = std::move(m->mon_status_json); +} + diff --git a/src/mgr/ClusterState.h b/src/mgr/ClusterState.h new file mode 100644 index 000000000000..44f69074daec --- /dev/null +++ b/src/mgr/ClusterState.h @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef CLUSTER_STATE_H_ +#define CLUSTER_STATE_H_ + +#include "mds/FSMap.h" +#include "common/Mutex.h" + +#include "osdc/Objecter.h" +#include "mon/MonClient.h" + +class MMgrDigest; + + +/** + * Cluster-scope state (things like cluster maps) as opposed + * to daemon-level state (things like perf counters and smart) + */ +class ClusterState +{ +protected: + MonClient *monc; + Objecter *objecter; + FSMap fsmap; + Mutex lock; + + bufferlist pg_summary_json; + bufferlist health_json; + bufferlist mon_status_json; + +public: + + void load_digest(MMgrDigest *m); + + const bufferlist &get_pg_summary() const {return pg_summary_json;} + const bufferlist &get_health() const {return health_json;} + const bufferlist &get_mon_status() const {return mon_status_json;} + + ClusterState(MonClient *monc_, Objecter *objecter_); + + void set_objecter(Objecter *objecter_); + void set_fsmap(FSMap const &new_fsmap); + + template + void with_fsmap(Callback&& cb, Args&&...args) + { + Mutex::Locker l(lock); + std::forward(cb)(const_cast(fsmap), + std::forward(args)...); + } + + template + void with_monmap(Callback&& cb, Args&&...args) + { + Mutex::Locker l(lock); + assert(monc != nullptr); + monc->with_monmap(cb); + } + + template + void with_osdmap(Callback&& cb, Args&&...args) + { + Mutex::Locker l(lock); + assert(objecter != nullptr); + objecter->with_osdmap(cb); + } +}; + +#endif + diff --git a/src/mgr/DaemonMetadata.cc b/src/mgr/DaemonMetadata.cc new file mode 100644 index 000000000000..dc3885c96d53 --- /dev/null +++ b/src/mgr/DaemonMetadata.cc @@ -0,0 +1,132 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "DaemonMetadata.h" + +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + +void DaemonMetadataIndex::insert(DaemonMetadataPtr dm) +{ + if (exists(dm->key)) { + _erase(dm->key); + } + + by_server[dm->hostname][dm->key] = dm; + all[dm->key] = dm; +} + +void DaemonMetadataIndex::_erase(DaemonKey dmk) +{ + assert(lock.is_locked_by_me()); + + const auto dm = all.at(dmk); + auto &server_collection = by_server[dm->hostname]; + server_collection.erase(dm->key); + if (server_collection.empty()) { + by_server.erase(dm->hostname); + } + + all.erase(dmk); +} + +DaemonMetadataCollection DaemonMetadataIndex::get_by_type(uint8_t type) const +{ + DaemonMetadataCollection result; + + for (const auto &i : all) { + if (i.first.first == type) { + result[i.first] = i.second; + } + } + + return result; +} + +DaemonMetadataCollection DaemonMetadataIndex::get_by_server(const std::string &hostname) const +{ + if (by_server.count(hostname)) { + return by_server.at(hostname); + } else { + return {}; + } +} + +bool DaemonMetadataIndex::exists(const DaemonKey &key) const +{ + return all.count(key) > 0; +} + +DaemonMetadataPtr DaemonMetadataIndex::get(const DaemonKey &key) +{ + return all.at(key); +} + +void DaemonMetadataIndex::cull(entity_type_t daemon_type, + std::set names_exist) +{ + Mutex::Locker l(lock); + + std::set victims; + + for (const auto &i : all) { + if (i.first.first != daemon_type) { + continue; + } + + if (names_exist.count(i.first.second) == 0) { + victims.insert(i.first); + } + } + + for (const auto &i : victims) { + dout(4) << "Removing data for " << i << dendl; + _erase(i); + } +} + +void DaemonPerfCounters::update(MMgrReport *report) +{ + dout(20) << "loading " << report->declare_types.size() << " new types, " + << report->packed.length() << " bytes of data" << dendl; + + // Load any newly declared types + for (const auto &t : report->declare_types) { + types.insert(std::make_pair(t.path, t)); + declared_types.insert(t.path); + } + + // Parse packed data according to declared set of types + bufferlist::iterator p = report->packed.begin(); + DECODE_START(1, p); + for (const auto &t_path : declared_types) { + const auto &t = types.at(t_path); + uint64_t val = 0; + uint64_t avgcount = 0; + uint64_t avgcount2 = 0; + + ::decode(val, p); + if (t.type & PERFCOUNTER_LONGRUNAVG) { + ::decode(avgcount, p); + ::decode(avgcount2, p); + } + // TODO: interface for insertion of avgs, add timestamp + instances[t_path].push(val); + } + // TODO: handle badly encoded things without asserting out + DECODE_FINISH(p); +} + + + diff --git a/src/mgr/DaemonMetadata.h b/src/mgr/DaemonMetadata.h new file mode 100644 index 000000000000..4aa406c52125 --- /dev/null +++ b/src/mgr/DaemonMetadata.h @@ -0,0 +1,149 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef DAEMON_METADATA_H_ +#define DAEMON_METADATA_H_ + +// TODO: rename me to DaemonState from DaemonMetadata + +#include +#include +#include +#include + +#include "common/Mutex.h" + +#include "msg/msg_types.h" + +// For PerfCounterType +#include "messages/MMgrReport.h" + + +// Unique reference to a daemon within a cluster +typedef std::pair DaemonKey; + +// An instance of a performance counter type, within +// a particular daemon. +class PerfCounterInstance +{ + // TODO: store some short history or whatever + uint64_t current; + public: + void push(uint64_t const &v) {current = v;} +}; + + +typedef std::map PerfCounterTypes; + +// Performance counters for one daemon +class DaemonPerfCounters +{ + public: + // The record of perf stat types, shared between daemons + PerfCounterTypes &types; + + DaemonPerfCounters(PerfCounterTypes &types_) + : types(types_) + {} + + std::map instances; + + // FIXME: this state is really local to DaemonServer, it's part + // of the protocol rather than being part of what other classes + // mgiht want to read. Maybe have a separate session object + // inside DaemonServer instead of stashing session-ish state here? + std::set declared_types; + + void update(MMgrReport *report); +}; + +// The state that we store about one daemon +class DaemonMetadata +{ + public: + DaemonKey key; + + // The hostname where daemon was last seen running (extracted + // from the metadata) + std::string hostname; + + // The metadata (hostname, version, etc) sent from the daemon + std::map metadata; + + // The perf counters received in MMgrReport messages + DaemonPerfCounters perf_counters; + + DaemonMetadata(PerfCounterTypes &types_) + : perf_counters(types_) + { + } +}; + +typedef std::shared_ptr DaemonMetadataPtr; +typedef std::map DaemonMetadataCollection; + + + + +/** + * Fuse the collection of per-daemon metadata from Ceph into + * a view that can be queried by service type, ID or also + * by server (aka fqdn). + */ +class DaemonMetadataIndex +{ + private: + std::map by_server; + DaemonMetadataCollection all; + + std::set updating; + + Mutex lock; + + public: + + DaemonMetadataIndex() : lock("DaemonState") {} + + // FIXME: shouldn't really be public, maybe construct DaemonMetadata + // objects internally to avoid this. + PerfCounterTypes types; + + void insert(DaemonMetadataPtr dm); + void _erase(DaemonKey dmk); + + bool exists(const DaemonKey &key) const; + DaemonMetadataPtr get(const DaemonKey &key); + DaemonMetadataCollection get_by_server(const std::string &hostname) const; + DaemonMetadataCollection get_by_type(uint8_t type) const; + + const DaemonMetadataCollection &get_all() const {return all;} + const std::map &get_all_servers() const + { + return by_server; + } + + void notify_updating(const DaemonKey &k) { updating.insert(k); } + void clear_updating(const DaemonKey &k) { updating.erase(k); } + bool is_updating(const DaemonKey &k) { return updating.count(k) > 0; } + + /** + * Remove state for all daemons of this type whose names are + * not present in `names_exist`. Use this function when you have + * a cluster map and want to ensure that anything absent in the map + * is also absent in this class. + */ + void cull(entity_type_t daemon_type, std::set names_exist); +}; + +#endif + diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc new file mode 100644 index 000000000000..bfd551d12257 --- /dev/null +++ b/src/mgr/DaemonServer.cc @@ -0,0 +1,309 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "DaemonServer.h" + +#include "messages/MMgrOpen.h" +#include "messages/MMgrConfigure.h" +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" + +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr.server " << __func__ << " " + +DaemonServer::DaemonServer(MonClient *monc_, + DaemonMetadataIndex &daemon_state_, + PyModules &py_modules_) + : Dispatcher(g_ceph_context), msgr(nullptr), monc(monc_), + daemon_state(daemon_state_), + py_modules(py_modules_), + auth_registry(g_ceph_context, + g_conf->auth_supported.empty() ? + g_conf->auth_cluster_required : + g_conf->auth_supported), + lock("DaemonServer") +{} + +DaemonServer::~DaemonServer() { + delete msgr; +} + +int DaemonServer::init(uint64_t gid, entity_addr_t client_addr) +{ + // Initialize Messenger + msgr = Messenger::create(g_ceph_context, g_conf->ms_type, + entity_name_t::MGR(gid), "server", getpid()); + int r = msgr->bind(g_conf->public_addr); + if (r < 0) + return r; + + msgr->set_myname(entity_name_t::MGR(gid)); + msgr->set_addr_unknowns(client_addr); + + msgr->start(); + msgr->add_dispatcher_tail(this); + + return 0; +} + +entity_addr_t DaemonServer::get_myaddr() const +{ + return msgr->get_myaddr(); +} + + +bool DaemonServer::ms_verify_authorizer(Connection *con, + int peer_type, + int protocol, + ceph::bufferlist& authorizer_data, + ceph::bufferlist& authorizer_reply, + bool& is_valid, + CryptoKey& session_key) +{ + auto handler = auth_registry.get_handler(protocol); + if (!handler) { + dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl; + assert(0); + is_valid = false; + return true; + } + + AuthCapsInfo caps_info; + EntityName name; + uint64_t global_id = 0; + + is_valid = handler->verify_authorizer(cct, monc->rotating_secrets, + authorizer_data, + authorizer_reply, name, + global_id, caps_info, + session_key); + + // TODO: invent some caps suitable for ceph-mgr + + return true; +} + + +bool DaemonServer::ms_get_authorizer(int dest_type, + AuthAuthorizer **authorizer, bool force_new) +{ + dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl; + + if (dest_type == CEPH_ENTITY_TYPE_MON) { + return true; + } + + if (force_new) { + if (monc->wait_auth_rotating(10) < 0) + return false; + } + + *authorizer = monc->auth->build_authorizer(dest_type); + dout(20) << "got authorizer " << *authorizer << dendl; + return *authorizer != NULL; +} + + +bool DaemonServer::ms_dispatch(Message *m) +{ + Mutex::Locker l(lock); + + switch(m->get_type()) { + case MSG_MGR_REPORT: + return handle_report(static_cast(m)); + case MSG_MGR_OPEN: + return handle_open(static_cast(m)); + case MSG_COMMAND: + return handle_command(static_cast(m)); + default: + dout(1) << "Unhandled message type " << m->get_type() << dendl; + return false; + }; +} + +void DaemonServer::shutdown() +{ + msgr->shutdown(); + msgr->wait(); +} + + + +bool DaemonServer::handle_open(MMgrOpen *m) +{ + DaemonKey key( + m->get_connection()->get_peer_type(), + m->daemon_name); + + dout(4) << "from " << m->get_connection() << " name " + << m->daemon_name << dendl; + + auto configure = new MMgrConfigure(); + configure->stats_period = 5; + m->get_connection()->send_message(configure); + + m->put(); + return true; +} + +bool DaemonServer::handle_report(MMgrReport *m) +{ + DaemonKey key( + m->get_connection()->get_peer_type(), + m->daemon_name); + + dout(4) << "from " << m->get_connection() << " name " + << m->daemon_name << dendl; + + DaemonMetadataPtr daemon; + if (daemon_state.exists(key)) { + daemon = daemon_state.get(key); + } else { + daemon = std::make_shared(daemon_state.types); + // FIXME: crap, we don't know the hostname at this stage. + daemon->key = key; + daemon_state.insert(daemon); + // FIXME: we should request metadata at this stage + } + + assert(daemon != nullptr); + auto daemon_counters = daemon->perf_counters; + daemon_counters.update(m); + + m->put(); + return true; +} + +struct MgrCommand { + string cmdstring; + string helpstring; + string module; + string perm; + string availability; +} mgr_commands[] = { + +#define COMMAND(parsesig, helptext, module, perm, availability) \ + {parsesig, helptext, module, perm, availability}, + +COMMAND("foo " \ + "name=bar,type=CephString", \ + "do a thing", "mgr", "rw", "cli") +}; + +bool DaemonServer::handle_command(MCommand *m) +{ + int r = 0; + std::stringstream ss; + std::stringstream ds; + bufferlist odata; + std::string prefix; + + assert(lock.is_locked_by_me()); + + cmdmap_t cmdmap; + + // TODO enforce some caps + + // TODO background the call into python land so that we don't + // block a messenger thread on python code. + + ConnectionRef con = m->get_connection(); + + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { + r = -EINVAL; + goto out; + } + + dout(4) << "decoded " << cmdmap.size() << dendl; + cmd_getval(cct, cmdmap, "prefix", prefix); + + dout(4) << "prefix=" << prefix << dendl; + + if (prefix == "get_command_descriptions") { + int cmdnum = 0; + + dout(10) << "reading commands from python modules" << dendl; + auto py_commands = py_modules.get_commands(); + + JSONFormatter f; + f.open_object_section("command_descriptions"); + for (const auto &pyc : py_commands) { + ostringstream secname; + secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; + dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring + << ")" << dendl; + dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring, + "mgr", pyc.perm, "cli"); + cmdnum++; + } +#if 0 + for (MgrCommand *cp = mgr_commands; + cp < &mgr_commands[ARRAY_SIZE(mgr_commands)]; cp++) { + + ostringstream secname; + secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; + dump_cmddesc_to_json(f, secname.str(), cp->cmdstring, cp->helpstring, + cp->module, cp->perm, cp->availability); + cmdnum++; + } +#endif + f.close_section(); // command_descriptions + + f.flush(ds); + goto out; + } else { + // Let's find you a handler! + MgrPyModule *handler = nullptr; + auto py_commands = py_modules.get_commands(); + for (const auto &pyc : py_commands) { + auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring); + dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl; + if (pyc_prefix == prefix) { + handler = pyc.handler; + break; + } + } + + if (handler == nullptr) { + ss << "No handler found for '" << prefix << "'"; + dout(4) << "No handler found for '" << prefix << "'" << dendl; + r = -EINVAL; + goto out; + } + + // FIXME: go run this python part in another thread, not inline + // with a ms_dispatch, so that the python part can block if it + // wants to. + dout(4) << "passing through " << cmdmap.size() << dendl; + r = handler->handle_command(cmdmap, &ds, &ss); + goto out; + } + + out: + std::string rs; + rs = ss.str(); + odata.append(ds); + dout(1) << "do_command r=" << r << " " << rs << dendl; + //clog->info() << rs << "\n"; + if (con) { + MCommandReply *reply = new MCommandReply(r, rs); + reply->set_tid(m->get_tid()); + reply->set_data(odata); + con->send_message(reply); + } + + m->put(); + return true; +} + diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h new file mode 100644 index 000000000000..ab9357436002 --- /dev/null +++ b/src/mgr/DaemonServer.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef DAEMON_SERVER_H_ +#define DAEMON_SERVER_H_ + +#include "PyModules.h" + +#include +#include + +#include "common/Mutex.h" + +#include +#include + +#include "auth/AuthAuthorizeHandler.h" + +#include "DaemonMetadata.h" + +class MMgrReport; +class MMgrOpen; +class MCommand; + + +/** + * Server used in ceph-mgr to communicate with Ceph daemons like + * MDSs and OSDs. + */ +class DaemonServer : public Dispatcher +{ +protected: + Messenger *msgr; + MonClient *monc; + DaemonMetadataIndex &daemon_state; + PyModules &py_modules; + + AuthAuthorizeHandlerRegistry auth_registry; + + Mutex lock; + + +public: + int init(uint64_t gid, entity_addr_t client_addr); + void shutdown(); + + entity_addr_t get_myaddr() const; + + DaemonServer(MonClient *monc_, + DaemonMetadataIndex &daemon_state_, + PyModules &py_modules_); + ~DaemonServer(); + + bool ms_dispatch(Message *m); + bool ms_handle_reset(Connection *con) { return false; } + void ms_handle_remote_reset(Connection *con) {} + bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, + bool force_new); + bool ms_verify_authorizer(Connection *con, + int peer_type, + int protocol, + ceph::bufferlist& authorizer, + ceph::bufferlist& authorizer_reply, + bool& isvalid, + CryptoKey& session_key); + + bool handle_open(MMgrOpen *m); + bool handle_report(MMgrReport *m); + bool handle_command(MCommand *m); +}; + +#endif + diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc new file mode 100644 index 000000000000..128426c8b626 --- /dev/null +++ b/src/mgr/Mgr.cc @@ -0,0 +1,576 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "common/errno.h" +#include "mon/MonClient.h" +#include "include/stringify.h" +#include "global/global_context.h" + +#include "mgr/MgrContext.h" + +#include "MgrPyModule.h" +#include "DaemonServer.h" +#include "messages/MMgrBeacon.h" +#include "messages/MMgrDigest.h" +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" + +#include "Mgr.h" + +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + + +Mgr::Mgr() : + Dispatcher(g_ceph_context), + objecter(NULL), + monc(new MonClient(g_ceph_context)), + lock("Mgr::lock"), + timer(g_ceph_context, lock), + finisher(g_ceph_context, "Mgr", "mgr-fin"), + waiting_for_fs_map(NULL), + py_modules(daemon_state, cluster_state, *monc, finisher), + cluster_state(monc, nullptr), + server(monc, daemon_state, py_modules) +{ + client_messenger = Messenger::create_client_messenger(g_ceph_context, "mds"); + + // FIXME: using objecter as convenience to handle incremental + // OSD maps, but that's overkill. We don't really need an objecter. + // Could we separate out the part of Objecter that we really need? + objecter = new Objecter(g_ceph_context, client_messenger, monc, NULL, 0, 0); + + cluster_state.set_objecter(objecter); +} + + +Mgr::~Mgr() +{ + delete objecter; + delete monc; + delete client_messenger; + assert(waiting_for_fs_map == NULL); +} + + +/** + * Context for completion of metadata mon commands: take + * the result and stash it in DaemonMetadataIndex + */ +class MetadataUpdate : public Context +{ + DaemonMetadataIndex &daemon_state; + DaemonKey key; + +public: + bufferlist outbl; + std::string outs; + + MetadataUpdate(DaemonMetadataIndex &daemon_state_, const DaemonKey &key_) + : daemon_state(daemon_state_), key(key_) {} + + void finish(int r) + { + daemon_state.clear_updating(key); + if (r == 0) { + if (key.first == CEPH_ENTITY_TYPE_MDS) { + json_spirit::mValue json_result; + bool read_ok = json_spirit::read( + outbl.to_str(), json_result); + if (!read_ok) { + dout(1) << "mon returned invalid JSON for " + << ceph_entity_type_name(key.first) + << "." << key.second << dendl; + return; + } + + + json_spirit::mObject daemon_meta = json_result.get_obj(); + DaemonMetadataPtr dm = std::make_shared(daemon_state.types); + dm->key = key; + dm->hostname = daemon_meta.at("hostname").get_str(); + + daemon_meta.erase("name"); + daemon_meta.erase("hostname"); + + for (const auto &i : daemon_meta) { + dm->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(dm); + } else if (key.first == CEPH_ENTITY_TYPE_OSD) { + } else { + assert(0); + } + } else { + dout(1) << "mon failed to return metadata for " + << ceph_entity_type_name(key.first) + << "." << key.second << ": " << cpp_strerror(r) << dendl; + } + } +}; + + + +int Mgr::init() +{ + Mutex::Locker l(lock); + + // Initialize Messenger + int r = client_messenger->bind(g_conf->public_addr); + if (r < 0) + return r; + + client_messenger->start(); + + objecter->set_client_incarnation(0); + objecter->init(); + + // Connect dispatchers before starting objecter + client_messenger->add_dispatcher_tail(objecter); + client_messenger->add_dispatcher_tail(this); + + // Initialize MonClient + if (monc->build_initial_monmap() < 0) { + objecter->shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return -1; + } + + monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD + |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); + monc->set_messenger(client_messenger); + monc->init(); + r = monc->authenticate(); + if (r < 0) { + derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl; + monc->shutdown(); + objecter->shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + + client_t whoami = monc->get_global_id(); + client_messenger->set_myname(entity_name_t::CLIENT(whoami.v)); + + // Start communicating with daemons to learn statistics etc + server.init(monc->get_global_id(), client_messenger->get_myaddr()); + + dout(4) << "Initialized server at " << server.get_myaddr() << dendl; + // TODO: send the beacon periodically + MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(), + server.get_myaddr()); + monc->send_mon_message(m); + + // Preload all daemon metadata (will subsequently keep this + // up to date by watching maps, so do the initial load before + // we subscribe to any maps) + dout(4) << "Loading daemon metadata..." << dendl; + load_all_metadata(); + + // Preload config keys (`get` for plugins is to be a fast local + // operation, we we don't have to synchronize these later because + // all sets will come via mgr) + load_config(); + + // Start Objecter and wait for OSD map + objecter->start(); + objecter->wait_for_osd_map(); + timer.init(); + + monc->sub_want("mgrdigest", 0, 0); + + // Prepare to receive FSMap and request it + dout(4) << "requesting FSMap..." << dendl; + C_SaferCond cond; + waiting_for_fs_map = &cond; + monc->sub_want("fsmap", 0, 0); + monc->renew_subs(); + + // Wait for FSMap + dout(4) << "waiting for FSMap..." << dendl; + lock.Unlock(); + cond.wait(); + lock.Lock(); + waiting_for_fs_map = nullptr; + dout(4) << "Got FSMap." << dendl; + + // Wait for MgrDigest...? + // TODO + + finisher.start(); + + dout(4) << "Complete." << dendl; + return 0; +} + +void Mgr::load_all_metadata() +{ + assert(lock.is_locked_by_me()); + + JSONCommand mds_cmd; + mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}"); + JSONCommand osd_cmd; + osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}"); + JSONCommand mon_cmd; + mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}"); + + mds_cmd.wait(); + osd_cmd.wait(); + mon_cmd.wait(); + + assert(mds_cmd.r == 0); + assert(mon_cmd.r == 0); + assert(osd_cmd.r == 0); + + for (auto &metadata_val : mds_cmd.json_result.get_array()) { + json_spirit::mObject daemon_meta = metadata_val.get_obj(); + if (daemon_meta.count("hostname") == 0) { + dout(1) << "Skipping incomplete metadata entry" << dendl; + continue; + } + + DaemonMetadataPtr dm = std::make_shared(daemon_state.types); + dm->key = DaemonKey(CEPH_ENTITY_TYPE_MDS, + daemon_meta.at("name").get_str()); + dm->hostname = daemon_meta.at("hostname").get_str(); + + daemon_meta.erase("name"); + daemon_meta.erase("hostname"); + + for (const auto &i : daemon_meta) { + dm->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(dm); + } + + for (auto &metadata_val : mon_cmd.json_result.get_array()) { + json_spirit::mObject daemon_meta = metadata_val.get_obj(); + if (daemon_meta.count("hostname") == 0) { + dout(1) << "Skipping incomplete metadata entry" << dendl; + continue; + } + + DaemonMetadataPtr dm = std::make_shared(daemon_state.types); + dm->key = DaemonKey(CEPH_ENTITY_TYPE_MON, + daemon_meta.at("name").get_str()); + dm->hostname = daemon_meta.at("hostname").get_str(); + + daemon_meta.erase("name"); + daemon_meta.erase("hostname"); + + for (const auto &i : daemon_meta) { + dm->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(dm); + } + + for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) { + json_spirit::mObject osd_metadata = osd_metadata_val.get_obj(); + if (osd_metadata.count("hostname") == 0) { + dout(1) << "Skipping incomplete metadata entry" << dendl; + continue; + } + dout(4) << osd_metadata.at("hostname").get_str() << dendl; + + DaemonMetadataPtr dm = std::make_shared(daemon_state.types); + dm->key = DaemonKey(CEPH_ENTITY_TYPE_OSD, + stringify(osd_metadata.at("id").get_int())); + dm->hostname = osd_metadata.at("hostname").get_str(); + + osd_metadata.erase("id"); + osd_metadata.erase("hostname"); + + for (const auto &i : osd_metadata) { + dm->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(dm); + } +} + +void Mgr::load_config() +{ + assert(lock.is_locked_by_me()); + + dout(10) << "listing keys" << dendl; + JSONCommand cmd; + cmd.run(monc, "{\"prefix\": \"config-key list\"}"); + + cmd.wait(); + assert(cmd.r == 0); + + std::map loaded; + + for (auto &key_str : cmd.json_result.get_array()) { + std::string const key = key_str.get_str(); + dout(20) << "saw key '" << key << "'" << dendl; + + const std::string config_prefix = PyModules::config_prefix; + + if (key.substr(0, config_prefix.size()) == config_prefix) { + dout(20) << "fetching '" << key << "'" << dendl; + Command get_cmd; + std::ostringstream cmd_json; + cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}"; + get_cmd.run(monc, cmd_json.str()); + get_cmd.wait(); + assert(get_cmd.r == 0); + + loaded[key] = get_cmd.outbl.to_str(); + } + } + + py_modules.insert_config(loaded); +} + + +void Mgr::shutdown() +{ + // First stop the server so that we're not taking any more incoming requests + server.shutdown(); + + // Then stop the finisher to ensure its enqueued contexts aren't going + // to touch references to the things we're about to tear down + finisher.stop(); + + lock.Lock(); + timer.shutdown(); + objecter->shutdown(); + lock.Unlock(); + + monc->shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); +} + +void Mgr::handle_osd_map() +{ + assert(lock.is_locked_by_me()); + + std::set names_exist; + + /** + * When we see a new OSD map, inspect the entity addrs to + * see if they have changed (service restart), and if so + * reload the metadata. + */ + objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) { + for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) { + if (!osd_map.exists(osd_id)) { + continue; + } + + // Remember which OSDs exist so that we can cull any that don't + names_exist.insert(stringify(osd_id)); + + // Consider whether to update the daemon metadata (new/restarted daemon) + bool update_meta = false; + const auto k = DaemonKey(CEPH_ENTITY_TYPE_OSD, stringify(osd_id)); + if (daemon_state.is_updating(k)) { + continue; + } + + if (daemon_state.exists(k)) { + auto metadata = daemon_state.get(k); + auto addr_iter = metadata->metadata.find("front_addr"); + if (addr_iter != metadata->metadata.end()) { + const std::string &metadata_addr = addr_iter->second; + const auto &map_addr = osd_map.get_addr(osd_id); + + if (metadata_addr != stringify(map_addr)) { + dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr + << " != " << stringify(map_addr) << dendl; + update_meta = true; + } else { + dout(20) << "OSD[" << osd_id << "] addr unchanged: " + << metadata_addr << dendl; + } + } else { + // Awkward case where daemon went into DaemonState because it + // sent us a report but its metadata didn't get loaded yet + update_meta = true; + } + } else { + update_meta = true; + } + + if (update_meta) { + daemon_state.notify_updating(k); + auto c = new MetadataUpdate(daemon_state, k); + std::ostringstream cmd; + cmd << "{\"prefix\": \"osd metadata\", \"id\": " + << osd_id << "}"; + int r = monc->start_mon_command( + {cmd.str()}, + {}, &c->outbl, &c->outs, c); + assert(r == 0); // start_mon_command defined to not fail + } + } + }); + + // TODO: same culling for MonMap and FSMap + daemon_state.cull(CEPH_ENTITY_TYPE_OSD, names_exist); +} + +bool Mgr::ms_dispatch(Message *m) +{ + derr << *m << dendl; + Mutex::Locker l(lock); + + switch (m->get_type()) { + case MSG_MGR_DIGEST: + handle_mgr_digest(static_cast(m)); + break; + case CEPH_MSG_MON_MAP: + // FIXME: we probably never get called here because MonClient + // has consumed the message. For consuming OSDMap we need + // to be the tail dispatcher, but to see MonMap we would + // need to be at the head. + // Result is that ClusterState has access to monmap (it reads + // from monclient anyway), but we don't see notifications. Hook + // into MonClient to get notifications instead of messing + // with message delivery to achieve it? + assert(0); + + py_modules.notify_all("mon_map", ""); + break; + case CEPH_MSG_FS_MAP: + py_modules.notify_all("fs_map", ""); + handle_fs_map((MFSMap*)m); + m->put(); + break; + case CEPH_MSG_OSD_MAP: + + handle_osd_map(); + + py_modules.notify_all("osd_map", ""); + + // Continuous subscribe, so that we can generate notifications + // for our MgrPyModules + objecter->maybe_request_map(); + m->put(); + break; + + default: + return false; + } + return true; +} + + +void Mgr::handle_fs_map(MFSMap* m) +{ + assert(lock.is_locked_by_me()); + + const FSMap &new_fsmap = m->get_fsmap(); + + if (waiting_for_fs_map) { + waiting_for_fs_map->complete(0); + waiting_for_fs_map = NULL; + } + + // TODO: callers (e.g. from python land) are potentially going to see + // the new fsmap before we've bothered populating all the resulting + // daemon_state. Maybe we should block python land while we're making + // this kind of update? + + cluster_state.set_fsmap(new_fsmap); + + auto mds_info = new_fsmap.get_mds_info(); + for (const auto &i : mds_info) { + const auto &info = i.second; + + const auto k = DaemonKey(CEPH_ENTITY_TYPE_MDS, info.name); + if (daemon_state.is_updating(k)) { + continue; + } + + bool update = false; + if (daemon_state.exists(k)) { + auto metadata = daemon_state.get(k); + // FIXME: nothing stopping old daemons being here, they won't have + // addr: need to handle case of pre-ceph-mgr daemons that don't have + // the fields we expect + auto metadata_addr = metadata->metadata.at("addr"); + const auto map_addr = info.addr; + + if (metadata_addr != stringify(map_addr)) { + dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr + << " != " << stringify(map_addr) << dendl; + update = true; + } else { + dout(20) << "MDS[" << info.name << "] addr unchanged: " + << metadata_addr << dendl; + } + } else { + update = true; + } + + if (update) { + daemon_state.notify_updating(k); + auto c = new MetadataUpdate(daemon_state, k); + std::ostringstream cmd; + cmd << "{\"prefix\": \"mds metadata\", \"who\": \"" + << info.name << "\"}"; + int r = monc->start_mon_command( + {cmd.str()}, + {}, &c->outbl, &c->outs, c); + assert(r == 0); // start_mon_command defined to not fail + } + } +} + + +bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, + bool force_new) +{ + if (dest_type == CEPH_ENTITY_TYPE_MON) + return true; + + if (force_new) { + if (monc->wait_auth_rotating(10) < 0) + return false; + } + + *authorizer = monc->auth->build_authorizer(dest_type); + return *authorizer != NULL; +} + + +int Mgr::main(vector args) +{ + return py_modules.main(args); +} + + +void Mgr::handle_mgr_digest(MMgrDigest* m) +{ + dout(10) << m->mon_status_json.length() << dendl; + dout(10) << m->health_json.length() << dendl; + cluster_state.load_digest(m); + py_modules.notify_all("mon_status", ""); + py_modules.notify_all("health", ""); + + // Hack: use this as a tick/opportunity to prompt python-land that + // the pgmap might have changed since last time we were here. + py_modules.notify_all("pg_summary", ""); + + m->put(); +} + diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h new file mode 100644 index 000000000000..bc33c6adfafe --- /dev/null +++ b/src/mgr/Mgr.h @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef CEPH_PYFOO_H_ +#define CEPH_PYFOO_H_ + +// Python.h comes first because otherwise it clobbers ceph's assert +#include "Python.h" +// Python's pyconfig-64.h conflicts with ceph's acconfig.h +#undef HAVE_SYS_WAIT_H +#undef HAVE_UNISTD_H +#undef HAVE_UTIME_H +#undef _POSIX_C_SOURCE +#undef _XOPEN_SOURCE + +#include "osdc/Objecter.h" +#include "mds/FSMap.h" +#include "messages/MFSMap.h" +#include "msg/Dispatcher.h" +#include "msg/Messenger.h" +#include "auth/Auth.h" +#include "common/Finisher.h" +#include "common/Timer.h" + +#include "DaemonServer.h" +#include "PyModules.h" + +#include "DaemonMetadata.h" +#include "ClusterState.h" + +class MCommand; +class MMgrDigest; + + +class MgrPyModule; + +class Mgr : public Dispatcher { +protected: + Objecter *objecter; + MonClient *monc; + Messenger *client_messenger; + + Mutex lock; + SafeTimer timer; + Finisher finisher; + + Context *waiting_for_fs_map; + + PyModules py_modules; + DaemonMetadataIndex daemon_state; + ClusterState cluster_state; + + DaemonServer server; + + void load_config(); + void load_all_metadata(); + +public: + Mgr(); + ~Mgr(); + + void handle_mgr_digest(MMgrDigest* m); + void handle_fs_map(MFSMap* m); + void handle_osd_map(); + bool ms_dispatch(Message *m); + bool ms_handle_reset(Connection *con) { return false; } + void ms_handle_remote_reset(Connection *con) {} + bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, + bool force_new); + int init(); + void shutdown(); + void usage() {} + int main(vector args); +}; + +#endif /* MDS_UTILITY_H_ */ diff --git a/src/mgr/MgrContext.h b/src/mgr/MgrContext.h new file mode 100644 index 000000000000..eae7d756ae6b --- /dev/null +++ b/src/mgr/MgrContext.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef MGR_CONTEXT_H_ +#define MGR_CONTEXT_H_ + +#include +#include "include/Context.h" + +#include "common/ceph_json.h" +#include "mon/MonClient.h" + +class C_StdFunction : public Context +{ +private: + std::function fn; + +public: + C_StdFunction(std::function fn_) + : fn(fn_) + {} + + void finish(int r) + { + fn(); + } +}; + +class Command +{ +protected: + C_SaferCond cond; +public: + bufferlist outbl; + std::string outs; + int r; + + void run(MonClient *monc, const std::string &command) + { + monc->start_mon_command({command}, {}, + &outbl, &outs, &cond); + } + + virtual void wait() + { + r = cond.wait(); + } + + virtual ~Command() {} +}; + + +class JSONCommand : public Command +{ +public: + json_spirit::mValue json_result; + + void wait() + { + Command::wait(); + + if (r == 0) { + bool read_ok = json_spirit::read( + outbl.to_str(), json_result); + if (!read_ok) { + r = -EINVAL; + } + } + } +}; + +#endif + diff --git a/src/mgr/MgrPyModule.cc b/src/mgr/MgrPyModule.cc new file mode 100644 index 000000000000..c1dee9f1968c --- /dev/null +++ b/src/mgr/MgrPyModule.cc @@ -0,0 +1,201 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#include "PyFormatter.h" + +#include "common/debug.h" + +#include "MgrPyModule.h" + + +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + +MgrPyModule::MgrPyModule(const std::string &module_name_) + : module_name(module_name_), pModule(nullptr), pClass(nullptr), + pClassInstance(nullptr) +{} + +MgrPyModule::~MgrPyModule() +{ + Py_XDECREF(pModule); + Py_XDECREF(pClass); + Py_XDECREF(pClassInstance); +} + +int MgrPyModule::load() +{ + // Load the module + PyObject *pName = PyString_FromString(module_name.c_str()); + pModule = PyImport_Import(pName); + Py_DECREF(pName); + if (pModule == nullptr) { + derr << "Module not found: '" << module_name << "'" << dendl; + return -ENOENT; + } + + // Find the class + // TODO: let them call it what they want instead of just 'Module' + pClass = PyObject_GetAttrString(pModule, (const char*)"Module"); + if (pClass == nullptr) { + derr << "Class not found in module '" << module_name << "'" << dendl; + return -EINVAL; + } + + + // Just using the module name as the handle, replace with a + // uuidish thing if needed + auto pyHandle = PyString_FromString(module_name.c_str()); + auto pArgs = PyTuple_Pack(1, pyHandle); + pClassInstance = PyObject_CallObject(pClass, pArgs); + if (pClassInstance == nullptr) { + derr << "Failed to construct class in '" << module_name << "'" << dendl; + return -EINVAL; + } else { + dout(1) << "Constructed class from module: " << module_name << dendl; + } + Py_DECREF(pArgs); + + return load_commands(); +} + +int MgrPyModule::serve() +{ + assert(pClassInstance != nullptr); + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + auto pValue = PyObject_CallMethod(pClassInstance, + (const char*)"serve", (const char*)"()"); + + if (pValue != NULL) { + Py_DECREF(pValue); + } else { + PyErr_Print(); + return -EINVAL; + } + + PyGILState_Release(gstate); + + return 0; +} + +void MgrPyModule::notify(const std::string ¬ify_type, const std::string ¬ify_id) +{ + assert(pClassInstance != nullptr); + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + // Execute + auto pValue = PyObject_CallMethod(pClassInstance, (const char*)"notify", "(ss)", + notify_type.c_str(), notify_id.c_str()); + + if (pValue != NULL) { + Py_DECREF(pValue); + } else { + PyErr_Print(); + // FIXME: callers can't be expected to handle a python module + // that has spontaneously broken, but Mgr() should provide + // a hook to unload misbehaving modules when they have an + // error somewhere like this + } + + PyGILState_Release(gstate); +} + +int MgrPyModule::load_commands() +{ + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + PyObject *command_list = PyObject_GetAttrString(pClassInstance, "COMMANDS"); + assert(command_list != nullptr); + const size_t list_size = PyList_Size(command_list); + for (size_t i = 0; i < list_size; ++i) { + PyObject *command = PyList_GetItem(command_list, i); + assert(command != nullptr); + + ModuleCommand item; + + PyObject *pCmd = PyDict_GetItemString(command, "cmd"); + assert(pCmd != nullptr); + item.cmdstring = PyString_AsString(pCmd); + + dout(20) << "loaded command " << item.cmdstring << dendl; + + PyObject *pDesc = PyDict_GetItemString(command, "desc"); + assert(pDesc != nullptr); + item.helpstring = PyString_AsString(pDesc); + + PyObject *pPerm = PyDict_GetItemString(command, "perm"); + assert(pPerm != nullptr); + item.perm = PyString_AsString(pPerm); + + item.handler = this; + + commands.push_back(item); + } + Py_DECREF(command_list); + + PyGILState_Release(gstate); + + dout(10) << "loaded " << commands.size() << " commands" << dendl; + + return 0; +} + +int MgrPyModule::handle_command( + const cmdmap_t &cmdmap, + std::stringstream *ss, + std::stringstream *ds) +{ + assert(ss != nullptr); + assert(ds != nullptr); + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + PyFormatter f; + cmdmap_dump(cmdmap, &f); + PyObject *py_cmd = f.get(); + + auto pResult = PyObject_CallMethod(pClassInstance, + (const char*)"handle_command", (const char*)"(O)", py_cmd); + + Py_DECREF(py_cmd); + + int r = 0; + if (pResult != NULL) { + if (PyTuple_Size(pResult) != 3) { + r = -EINVAL; + } else { + r = PyInt_AsLong(PyTuple_GetItem(pResult, 0)); + *ds << PyString_AsString(PyTuple_GetItem(pResult, 1)); + *ss << PyString_AsString(PyTuple_GetItem(pResult, 2)); + } + + Py_DECREF(pResult); + } else { + PyErr_Print(); + r = -EINVAL; + } + + PyGILState_Release(gstate); + + return r; +} + diff --git a/src/mgr/MgrPyModule.h b/src/mgr/MgrPyModule.h new file mode 100644 index 000000000000..8a5dc94edef7 --- /dev/null +++ b/src/mgr/MgrPyModule.h @@ -0,0 +1,74 @@ + +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef MGR_PY_MODULE_H_ +#define MGR_PY_MODULE_H_ + +// Python.h comes first because otherwise it clobbers ceph's assert +#include "Python.h" + +#include "common/cmdparse.h" + +#include +#include + + +class MgrPyModule; + +/** + * A Ceph CLI command description provided from a Python module + */ +class ModuleCommand { +public: + std::string cmdstring; + std::string helpstring; + std::string perm; + MgrPyModule *handler; +}; + +class MgrPyModule +{ +private: + const std::string module_name; + PyObject *pModule; + PyObject *pClass; + PyObject *pClassInstance; + + + std::vector commands; + + int load_commands(); + +public: + MgrPyModule(const std::string &module_name); + ~MgrPyModule(); + + int load(); + int serve(); + void notify(const std::string ¬ify_type, const std::string ¬ify_id); + + const std::vector &get_commands() const + { + return commands; + } + + int handle_command( + const cmdmap_t &cmdmap, + std::stringstream *ss, + std::stringstream *ds); +}; + +#endif + diff --git a/src/mgr/PyFormatter.cc b/src/mgr/PyFormatter.cc new file mode 100644 index 000000000000..55154c79f225 --- /dev/null +++ b/src/mgr/PyFormatter.cc @@ -0,0 +1,123 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Red Hat Inc + * + * Author: John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "PyFormatter.h" + + +void PyFormatter::open_array_section(const char *name) +{ + PyObject *list = PyList_New(0); + dump_pyobject(name, list); + stack.push(cursor); + cursor = list; +} + +void PyFormatter::open_object_section(const char *name) +{ + PyObject *dict = PyDict_New(); + dump_pyobject(name, dict); + stack.push(cursor); + cursor = dict; +} + +void PyFormatter::dump_unsigned(const char *name, uint64_t u) +{ + PyObject *p = PyLong_FromLongLong(u); + assert(p); + dump_pyobject(name, p); +} + +void PyFormatter::dump_int(const char *name, int64_t u) +{ + PyObject *p = PyLong_FromLongLong(u); + assert(p); + dump_pyobject(name, p); +} + +void PyFormatter::dump_float(const char *name, double d) +{ + dump_pyobject(name, PyFloat_FromDouble(d)); +} + +void PyFormatter::dump_string(const char *name, const std::string& s) +{ + dump_pyobject(name, PyString_FromString(s.c_str())); +} + +void PyFormatter::dump_bool(const char *name, bool b) +{ + if (b) { + Py_INCREF(Py_True); + dump_pyobject(name, Py_True); + } else { + Py_INCREF(Py_False); + dump_pyobject(name, Py_False); + } +} + +std::ostream& PyFormatter::dump_stream(const char *name) +{ + // Give the caller an ostream, construct a PyString, + // and remember the association between the two. On flush, + // we'll read from the ostream into the PyString + auto ps = std::make_shared(); + ps->cursor = cursor; + ps->name = name; + + pending_streams.push_back(ps); + + return ps->stream; +} + +void PyFormatter::dump_format_va(const char *name, const char *ns, bool quoted, const char *fmt, va_list ap) +{ + // TODO + assert(0); +} + +/** + * Steals reference to `p` + */ +void PyFormatter::dump_pyobject(const char *name, PyObject *p) +{ + if (PyList_Check(cursor)) { + PyList_Append(cursor, p); + Py_DECREF(p); + } else if (PyDict_Check(cursor)) { + PyObject *key = PyString_FromString(name); + PyDict_SetItem(cursor, key, p); + Py_DECREF(key); + Py_DECREF(p); + } else { + assert(0); + } +} + +void PyFormatter::finish_pending_streams() +{ + for (const auto &i : pending_streams) { + PyObject *tmp_cur = cursor; + cursor = i->cursor; + dump_pyobject( + i->name.c_str(), + PyString_FromString(i->stream.str().c_str())); + cursor = tmp_cur; + } + + pending_streams.clear(); +} + diff --git a/src/mgr/PyFormatter.h b/src/mgr/PyFormatter.h new file mode 100644 index 000000000000..48dd5e697ba7 --- /dev/null +++ b/src/mgr/PyFormatter.h @@ -0,0 +1,142 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Red Hat Inc + * + * Author: John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef PY_FORMATTER_H_ +#define PY_FORMATTER_H_ + +// Python.h comes first because otherwise it clobbers ceph's assert +#include "Python.h" +// Python's pyconfig-64.h conflicts with ceph's acconfig.h +#undef HAVE_SYS_WAIT_H +#undef HAVE_UNISTD_H +#undef HAVE_UTIME_H +#undef _POSIX_C_SOURCE +#undef _XOPEN_SOURCE + +#include +#include +#include + +#include "common/Formatter.h" + +class PyFormatter : public ceph::Formatter +{ +public: + PyFormatter(bool pretty = false, bool array = false) + { + // Initialise cursor to an empty dict + if (!array) { + root = cursor = PyDict_New(); + } else { + root = cursor = PyList_New(0); + } + } + + ~PyFormatter() + { + cursor = NULL; + Py_DECREF(root); + root = NULL; + } + + // Obscure, don't care. + void open_array_section_in_ns(const char *name, const char *ns) + {assert(0);} + void open_object_section_in_ns(const char *name, const char *ns) + {assert(0);} + + void reset() + { + const bool array = PyList_Check(root); + Py_DECREF(root); + if (array) { + root = cursor = PyList_New(0); + } else { + root = cursor = PyDict_New(); + } + } + + virtual void set_status(int status, const char* status_name) {} + virtual void output_header() {}; + virtual void output_footer() {}; + + + virtual void open_array_section(const char *name); + void open_object_section(const char *name); + void close_section() + { + assert(cursor != root); + assert(!stack.empty()); + cursor = stack.top(); + stack.pop(); + } + void dump_bool(const char *name, bool b); + void dump_unsigned(const char *name, uint64_t u); + void dump_int(const char *name, int64_t u); + void dump_float(const char *name, double d); + void dump_string(const char *name, const std::string& s); + std::ostream& dump_stream(const char *name); + void dump_format_va(const char *name, const char *ns, bool quoted, const char *fmt, va_list ap); + + void flush(std::ostream& os) + { + // This class is not a serializer: this doens't make sense + assert(0); + } + + int get_len() const + { + // This class is not a serializer: this doens't make sense + assert(0); + return 0; + } + + void write_raw_data(const char *data) + { + // This class is not a serializer: this doens't make sense + assert(0); + } + + PyObject *get() + { + finish_pending_streams(); + + Py_INCREF(root); + return root; + } + + void finish_pending_streams(); + +private: + PyObject *root; + PyObject *cursor; + std::stack stack; + + void dump_pyobject(const char *name, PyObject *p); + + class PendingStream { + public: + PyObject *cursor; + std::string name; + std::stringstream stream; + }; + + std::list > pending_streams; + +}; + +#endif + diff --git a/src/mgr/PyModules.cc b/src/mgr/PyModules.cc new file mode 100644 index 000000000000..82fd2c19f7dd --- /dev/null +++ b/src/mgr/PyModules.cc @@ -0,0 +1,389 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#include "PyState.h" +#include "PyFormatter.h" + +#include "osd/OSDMap.h" +#include "mon/MonMap.h" + +#include "mgr/MgrContext.h" + +#include "PyModules.h" + +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + +void PyModules::dump_server(const std::string &hostname, + const DaemonMetadataCollection &dmc, + Formatter *f) +{ + f->dump_string("hostname", hostname); + f->open_array_section("services"); + std::string ceph_version; + + for (const auto &i : dmc) { + const auto &key = i.first; + const std::string str_type = ceph_entity_type_name(key.first); + const std::string &svc_name = key.second; + + // TODO: pick the highest version, and make sure that + // somewhere else (during health reporting?) we are + // indicating to the user if we see mixed versions + ceph_version = i.second->metadata.at("ceph_version"); + + f->open_object_section("service"); + f->dump_string("type", str_type); + f->dump_string("id", svc_name); + f->close_section(); + } + f->close_section(); + + f->dump_string("ceph_version", ceph_version); +} + + + +PyObject *PyModules::get_server_python(const std::string &hostname) +{ + PyThreadState *tstate = PyEval_SaveThread(); + Mutex::Locker l(lock); + PyEval_RestoreThread(tstate); + dout(10) << " (" << hostname << ")" << dendl; + + auto dmc = daemon_state.get_by_server(hostname); + + PyFormatter f; + dump_server(hostname, dmc, &f); + return f.get(); +} + + +PyObject *PyModules::list_servers_python() +{ + PyThreadState *tstate = PyEval_SaveThread(); + Mutex::Locker l(lock); + PyEval_RestoreThread(tstate); + dout(10) << " >" << dendl; + + PyFormatter f(false, true); + const auto &all = daemon_state.get_all_servers(); + for (const auto &i : all) { + const auto &hostname = i.first; + + f.open_object_section("server"); + dump_server(hostname, i.second, &f); + f.close_section(); + } + + return f.get(); +} + +PyObject *PyModules::get_metadata_python(std::string const &handle, + entity_type_t svc_type, const std::string &svc_id) +{ + auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id)); + PyFormatter f; + f.dump_string("hostname", metadata->hostname); + for (const auto &i : metadata->metadata) { + f.dump_string(i.first.c_str(), i.second); + } + + return f.get(); +} + + +PyObject *PyModules::get_python(const std::string &what) +{ + PyThreadState *tstate = PyEval_SaveThread(); + Mutex::Locker l(lock); + PyEval_RestoreThread(tstate); + + if (what == "fs_map") { + PyFormatter f; + cluster_state.with_fsmap([&f](const FSMap &fsmap) { + fsmap.dump(&f); + }); + return f.get(); + } else if (what == "osdmap_crush_map_text") { + bufferlist rdata; + cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){ + osd_map.crush->encode(rdata); + }); + std::string crush_text = rdata.to_str(); + return PyString_FromString(crush_text.c_str()); + } else if (what.substr(0, 7) == "osd_map") { + PyFormatter f; + cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){ + if (what == "osd_map") { + osd_map.dump(&f); + } else if (what == "osd_map_tree") { + osd_map.print_tree(&f, nullptr); + } else if (what == "osd_map_crush") { + osd_map.crush->dump(&f); + } + }); + return f.get(); + } else if (what == "config") { + PyFormatter f; + g_conf->show_config(&f); + return f.get(); + } else if (what == "mon_map") { + PyFormatter f; + cluster_state.with_monmap( + [&f](const MonMap &monmap) { + monmap.dump(&f); + } + ); + return f.get(); + } else if (what == "fs_map") { + PyFormatter f; + cluster_state.with_fsmap( + [&f](const FSMap &fsmap) { + fsmap.dump(&f); + } + ); + return f.get(); + } else if (what == "osd_metadata") { + PyFormatter f; + auto dmc = daemon_state.get_by_type(CEPH_ENTITY_TYPE_OSD); + for (const auto &i : dmc) { + f.open_object_section(i.first.second.c_str()); + f.dump_string("hostname", i.second->hostname); + for (const auto &j : i.second->metadata) { + f.dump_string(j.first.c_str(), j.second); + } + f.close_section(); + } + return f.get(); + } else if (what == "pg_summary" || what == "health" || what == "mon_status") { + PyFormatter f; + bufferlist json; + if (what == "pg_summary") { + json = cluster_state.get_pg_summary(); + } else if (what == "health") { + json = cluster_state.get_health(); + } else if (what == "mon_status") { + json = cluster_state.get_mon_status(); + } else { + assert(false); + } + f.dump_string("json", json.to_str()); + return f.get(); + } else { + derr << "Python module requested unknown data '" << what << "'" << dendl; + Py_RETURN_NONE; + } +} + +//XXX courtesy of http://stackoverflow.com/questions/1418015/how-to-get-python-exception-text +#include +// decode a Python exception into a string +std::string handle_pyerror() +{ + using namespace boost::python; + using namespace boost; + + PyObject *exc,*val,*tb; + object formatted_list, formatted; + PyErr_Fetch(&exc,&val,&tb); + handle<> hexc(exc),hval(allow_null(val)),htb(allow_null(tb)); + object traceback(import("traceback")); + if (!tb) { + object format_exception_only(traceback.attr("format_exception_only")); + formatted_list = format_exception_only(hexc,hval); + } else { + object format_exception(traceback.attr("format_exception")); + formatted_list = format_exception(hexc,hval,htb); + } + formatted = str("\n").join(formatted_list); + return extract(formatted); +} + +int PyModules::main(vector args) +{ + global_handle = this; + + // Set up global python interpreter + Py_Initialize(); + + // Some python modules do not cope with an unpopulated argv, so lets + // fake one. This step also picks up site-packages into sys.path. + const char *argv[] = {"ceph-mgr"}; + PySys_SetArgv(1, (char**)argv); + + // Populate python namespace with callable hooks + Py_InitModule("ceph_state", CephStateMethods); + + // Configure sys.path to include mgr_module_path + const std::string module_path = g_conf->mgr_module_path; + dout(4) << "Loading modules from '" << module_path << "'" << dendl; + std::string sys_path = Py_GetPath(); + + // We need site-packages for flask et al, unless we choose to + // embed them in the ceph package. site-packages is an interpreter-specific + // thing, so as an embedded interpreter we're responsible for picking + // this. FIXME: don't hardcode this. + std::string site_packages = "/usr/lib/python2.7/site-packages:/usr/lib64/python2.7/site-packages:/usr/lib64/python2.7"; + sys_path += ":"; + sys_path += site_packages; + + sys_path += ":"; + sys_path += module_path; + dout(10) << "Computed sys.path '" << sys_path << "'" << dendl; + PySys_SetPath((char*)(sys_path.c_str())); + + // Let CPython know that we will be calling it back from other + // threads in future. + if (! PyEval_ThreadsInitialized()) { + PyEval_InitThreads(); + } + + // Load python code + // TODO load mgr_modules list, run them all in a thread each. + auto mod = new MgrPyModule("rest"); + int r = mod->load(); + if (r != 0) { + derr << "Error loading python module" << dendl; + derr << handle_pyerror() << dendl; +#if 0 + PyObject *ptype, *pvalue, *ptraceback; + PyErr_Fetch(&ptype, &pvalue, &ptraceback); + if (ptype) { + if (pvalue) { + char *pStrErrorMessage = PyString_AsString(pvalue); + +XXX why is pvalue giving null when converted to string? + + assert(pStrErrorMessage != nullptr); + derr << "Exception: " << pStrErrorMessage << dendl; + Py_DECREF(ptraceback); + Py_DECREF(pvalue); + } + Py_DECREF(ptype); + } +#endif + + // FIXME: be tolerant of bad modules, log an error and continue + // to load other, healthy modules. + return r; + } + { + Mutex::Locker locker(lock); + modules["rest"] = mod; + } + + // Execute python server + mod->serve(); + + { + Mutex::Locker locker(lock); + // Tear down modules + for (auto i : modules) { + delete i.second; + } + modules.clear(); + } + + Py_Finalize(); + return 0; +} + +void PyModules::notify_all(const std::string ¬ify_type, + const std::string ¬ify_id) +{ + Mutex::Locker l(lock); + + dout(10) << __func__ << ": notify_all " << notify_type << dendl; + for (auto i : modules) { + auto module = i.second; + // Send all python calls down a Finisher to avoid blocking + // C++ code, and avoid any potential lock cycles. + finisher.queue(new C_StdFunction([module, notify_type, notify_id](){ + module->notify(notify_type, notify_id); + })); + } +} + +bool PyModules::get_config(const std::string &handle, + const std::string &key, std::string *val) const +{ + const std::string global_key = config_prefix + handle + "." + key; + + if (config_cache.count(global_key)) { + *val = config_cache.at(global_key); + return true; + } else { + return false; + } +} + +void PyModules::set_config(const std::string &handle, + const std::string &key, const std::string &val) +{ + const std::string global_key = config_prefix + handle + "." + key; + + config_cache[global_key] = val; + + std::ostringstream cmd_json; + Command set_cmd; + + JSONFormatter jf; + jf.open_object_section("cmd"); + jf.dump_string("prefix", "config-key put"); + jf.dump_string("key", global_key); + jf.dump_string("val", val); + jf.close_section(); + jf.flush(cmd_json); + + set_cmd.run(&monc, cmd_json.str()); + set_cmd.wait(); + + // FIXME: is config-key put ever allowed to fail? + assert(set_cmd.r == 0); +} + +std::vector PyModules::get_commands() +{ + Mutex::Locker l(lock); + + std::vector result; + for (auto i : modules) { + auto module = i.second; + auto mod_commands = module->get_commands(); + for (auto j : mod_commands) { + result.push_back(j); + } + } + + return result; +} + +void PyModules::insert_config(const std::map &new_config) +{ + dout(4) << "Loaded " << new_config.size() << " config settings" << dendl; + config_cache = new_config; +} + +void PyModules::log(const std::string &handle, + unsigned level, const std::string &record) +{ +#undef dout_prefix +#define dout_prefix *_dout << "mgr[" << handle << "] " + dout(level) << record << dendl; +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " +} diff --git a/src/mgr/PyModules.h b/src/mgr/PyModules.h new file mode 100644 index 000000000000..302063697315 --- /dev/null +++ b/src/mgr/PyModules.h @@ -0,0 +1,92 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef PY_MODULES_H_ +#define PY_MODULES_H_ + +#include "MgrPyModule.h" + +#include "common/Finisher.h" +#include "common/Mutex.h" + + +#include "DaemonMetadata.h" +#include "ClusterState.h" + + + + + +class PyModules +{ + protected: + std::map modules; + + DaemonMetadataIndex &daemon_state; + ClusterState &cluster_state; + MonClient &monc; + Finisher &finisher; + + Mutex lock; + +public: + static constexpr auto config_prefix = "mgr."; + + PyModules(DaemonMetadataIndex &ds, ClusterState &cs, MonClient &mc, + Finisher &f) + : daemon_state(ds), cluster_state(cs), monc(mc), finisher(f), + lock("PyModules") + { + } + + // FIXME: wrap for send_command? + MonClient &get_monc() {return monc;} + + PyObject *get_python(const std::string &what); + PyObject *get_server_python(const std::string &hostname); + PyObject *list_servers_python(); + PyObject *get_metadata_python(std::string const &handle, + entity_type_t svc_type, const std::string &svc_id); + + std::map config_cache; + + std::vector get_commands(); + + void insert_config(const std::map &new_config); + + // Public so that MonCommandCompletion can use it + // FIXME: bit weird that we're sending command completions + // to all modules (we rely on them to ignore anything that + // they don't recognise), but when we get called from + // python-land we don't actually know who we are. Need + // to give python-land a handle in initialisation. + void notify_all(const std::string ¬ify_type, + const std::string ¬ify_id); + + int main(std::vector args); + + void dump_server(const std::string &hostname, + const DaemonMetadataCollection &dmc, + Formatter *f); + + bool get_config(const std::string &handle, + const std::string &key, std::string *val) const; + void set_config(const std::string &handle, + const std::string &key, const std::string &val); + + void log(const std::string &handle, + unsigned level, const std::string &record); +}; + +#endif + diff --git a/src/mgr/PyState.cc b/src/mgr/PyState.cc new file mode 100644 index 000000000000..6e1534dad644 --- /dev/null +++ b/src/mgr/PyState.cc @@ -0,0 +1,235 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +/** + * The interface we present to python code that runs within + * ceph-mgr. + */ + +#include "Mgr.h" + +#include "mon/MonClient.h" + +#include "PyState.h" + +PyModules *global_handle = NULL; + + +class MonCommandCompletion : public Context +{ + PyObject *python_completion; + const std::string tag; + +public: + std::string outs; + bufferlist outbl; + + MonCommandCompletion(PyObject* ev, const std::string &tag_) + : python_completion(ev), tag(tag_) + { + assert(python_completion != nullptr); + Py_INCREF(python_completion); + } + + ~MonCommandCompletion() + { + Py_DECREF(python_completion); + } + + void finish(int r) + { + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + auto set_fn = PyObject_GetAttrString(python_completion, "complete"); + assert(set_fn != nullptr); + + auto pyR = PyInt_FromLong(r); + auto pyOutBl = PyString_FromString(outbl.to_str().c_str()); + auto pyOutS = PyString_FromString(outs.c_str()); + auto args = PyTuple_Pack(3, pyR, pyOutBl, pyOutS); + Py_DECREF(pyR); + Py_DECREF(pyOutBl); + Py_DECREF(pyOutS); + + auto rtn = PyObject_CallObject(set_fn, args); + if (rtn != nullptr) { + Py_DECREF(rtn); + } + Py_DECREF(args); + + PyGILState_Release(gstate); + + global_handle->notify_all("command", tag); + } +}; + + +static PyObject* +ceph_send_command(PyObject *self, PyObject *args) +{ + char *handle = nullptr; + char *cmd_json = nullptr; + char *tag = nullptr; + PyObject *completion = nullptr; + if (!PyArg_ParseTuple(args, "sOss:ceph_send_command", + &handle, &completion, &cmd_json, &tag)) { + return nullptr; + } + + auto set_fn = PyObject_GetAttrString(completion, "complete"); + if (set_fn == nullptr) { + assert(0); // TODO raise python exception instead + } else { + assert(PyCallable_Check(set_fn)); + } + Py_DECREF(set_fn); + + auto c = new MonCommandCompletion(completion, tag); + auto r = global_handle->get_monc().start_mon_command( + {cmd_json}, + {}, + &c->outbl, + &c->outs, + c); + assert(r == 0); // start_mon_command is forbidden to fail + + Py_RETURN_NONE; +} + + +static PyObject* +ceph_state_get(PyObject *self, PyObject *args) +{ + char *handle = nullptr; + char *what = NULL; + if (!PyArg_ParseTuple(args, "ss:ceph_state_get", &handle, &what)) { + return NULL; + } + + return global_handle->get_python(what); +} + + +static PyObject* +ceph_get_server(PyObject *self, PyObject *args) +{ + char *handle = nullptr; + char *hostname = NULL; + if (!PyArg_ParseTuple(args, "sz:ceph_get_server", &handle, &hostname)) { + return NULL; + } + + if (hostname) { + return global_handle->get_server_python(hostname); + } else { + return global_handle->list_servers_python(); + } +} + +static PyObject* +ceph_config_get(PyObject *self, PyObject *args) +{ + char *handle = nullptr; + char *what = nullptr; + if (!PyArg_ParseTuple(args, "ss:ceph_config_get", &handle, &what)) { + derr << "Invalid args!" << dendl; + return nullptr; + } + + std::string value; + bool found = global_handle->get_config(handle, what, &value); + if (found) { + derr << "Found" << dendl; + return PyString_FromString(value.c_str()); + } else { + derr << "Not found" << dendl; + Py_RETURN_NONE; + } +} + +static PyObject* +ceph_config_set(PyObject *self, PyObject *args) +{ + char *handle = nullptr; + char *key = nullptr; + char *value = nullptr; + if (!PyArg_ParseTuple(args, "sss:ceph_config_set", &handle, &key, &value)) { + return nullptr; + } + + global_handle->set_config(handle, key, value); + + Py_RETURN_NONE; +} + +static PyObject* +get_metadata(PyObject *self, PyObject *args) +{ + char *handle = nullptr; + char *type_str = NULL; + char *svc_id = NULL; + if (!PyArg_ParseTuple(args, "sss:get_metadata", &handle, &type_str, &svc_id)) { + return nullptr; + } + + entity_type_t svc_type; + if (type_str == std::string("mds")) { + svc_type = CEPH_ENTITY_TYPE_MDS; + } else if (type_str == std::string("osd")) { + svc_type = CEPH_ENTITY_TYPE_OSD; + } else if (type_str == std::string("mon")) { + svc_type = CEPH_ENTITY_TYPE_MON; + } else { + // FIXME: form a proper exception + return nullptr; + } + + return global_handle->get_metadata_python(handle, svc_type, svc_id); +} + +static PyObject* +ceph_log(PyObject *self, PyObject *args) +{ + int level = 0; + char *record = nullptr; + char *handle = nullptr; + if (!PyArg_ParseTuple(args, "sis:log", &handle, &level, &record)) { + return nullptr; + } + + global_handle->log(handle, level, record); + + Py_RETURN_NONE; +} + + + +PyMethodDef CephStateMethods[] = { + {"get", ceph_state_get, METH_VARARGS, + "Get a cluster object"}, + {"get_server", ceph_get_server, METH_VARARGS, + "Get a server object"}, + {"get_metadata", get_metadata, METH_VARARGS, + "Get a service's metadata"}, + {"send_command", ceph_send_command, METH_VARARGS, + "Send a mon command"}, + {"get_config", ceph_config_get, METH_VARARGS, + "Get a configuration value"}, + {"set_config", ceph_config_set, METH_VARARGS, + "Set a configuration value"}, + {"log", ceph_log, METH_VARARGS, + "Emit a (local) log message"}, + {NULL, NULL, 0, NULL} +}; + diff --git a/src/mgr/PyState.h b/src/mgr/PyState.h new file mode 100644 index 000000000000..e53296b07fa2 --- /dev/null +++ b/src/mgr/PyState.h @@ -0,0 +1,12 @@ +#ifndef PYSTATE_H_ +#define PYSTATE_H_ + +#include "Python.h" + +class PyModules; + +extern PyModules *global_handle; +extern PyMethodDef CephStateMethods[]; + +#endif +