]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: create ceph-mgr service
authorJohn Spray <john.spray@redhat.com>
Thu, 30 Jun 2016 13:05:02 +0000 (14:05 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 29 Sep 2016 16:26:54 +0000 (17:26 +0100)
Signed-off-by: John Spray <john.spray@redhat.com>
18 files changed:
src/ceph_mgr.cc [new file with mode: 0644]
src/mgr/ClusterState.cc [new file with mode: 0644]
src/mgr/ClusterState.h [new file with mode: 0644]
src/mgr/DaemonMetadata.cc [new file with mode: 0644]
src/mgr/DaemonMetadata.h [new file with mode: 0644]
src/mgr/DaemonServer.cc [new file with mode: 0644]
src/mgr/DaemonServer.h [new file with mode: 0644]
src/mgr/Mgr.cc [new file with mode: 0644]
src/mgr/Mgr.h [new file with mode: 0644]
src/mgr/MgrContext.h [new file with mode: 0644]
src/mgr/MgrPyModule.cc [new file with mode: 0644]
src/mgr/MgrPyModule.h [new file with mode: 0644]
src/mgr/PyFormatter.cc [new file with mode: 0644]
src/mgr/PyFormatter.h [new file with mode: 0644]
src/mgr/PyModules.cc [new file with mode: 0644]
src/mgr/PyModules.h [new file with mode: 0644]
src/mgr/PyState.cc [new file with mode: 0644]
src/mgr/PyState.h [new file with mode: 0644]

diff --git a/src/ceph_mgr.cc b/src/ceph_mgr.cc
new file mode 100644 (file)
index 0000000..4a0fc75
--- /dev/null
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat Inc
+ *
+ * Author: John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "mgr/Mgr.h"
+
+#include "include/types.h"
+#include "common/config.h"
+#include "common/ceph_argparse.h"
+#include "common/errno.h"
+#include "global/global_init.h"
+
+
+int main(int argc, const char **argv)
+{
+  vector<const char*> args;
+  argv_to_vec(argc, argv, args);
+  env_to_vec(args);
+
+  global_init(NULL, args, CEPH_ENTITY_TYPE_MGR, CODE_ENVIRONMENT_DAEMON, 0,
+              "mgr_data");
+  common_init_finish(g_ceph_context);
+  // For consumption by KeyRing::from_ceph_context in MonClient
+  g_conf->set_val("keyring", "$mgr_data/keyring", false);
+
+  Mgr mgr;
+
+  // Handle --help before calling init() so we don't depend on network.
+  if ((args.size() == 1 && (std::string(args[0]) == "--help" || std::string(args[0]) == "-h"))) {
+    mgr.usage();
+    return 0;
+  }
+
+  global_init_daemonize(g_ceph_context);
+  global_init_chdir(g_ceph_context);
+  common_init_finish(g_ceph_context);
+
+  // Connect to mon cluster, download MDS map etc
+  int rc = mgr.init();
+  if (rc != 0) {
+      std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl;
+      return rc;
+  }
+
+  // Finally, execute the user's commands
+  rc = mgr.main(args);
+  if (rc != 0) {
+    std::cerr << "Error (" << cpp_strerror(rc) << ")" << std::endl;
+  }
+
+  mgr.shutdown();
+
+  return rc;
+}
+
diff --git a/src/mgr/ClusterState.cc b/src/mgr/ClusterState.cc
new file mode 100644 (file)
index 0000000..1e16810
--- /dev/null
@@ -0,0 +1,43 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "messages/MMgrDigest.h"
+
+#include "mgr/ClusterState.h"
+
+
+ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_)
+  : monc(monc_), objecter(objecter_), lock("ClusterState")
+{}
+
+void ClusterState::set_objecter(Objecter *objecter_)
+{
+  Mutex::Locker l(lock);
+
+  objecter = objecter_;
+}
+
+void ClusterState::set_fsmap(FSMap const &new_fsmap)
+{
+  Mutex::Locker l(lock);
+
+  fsmap = new_fsmap;
+}
+
+void ClusterState::load_digest(MMgrDigest *m)
+{
+  pg_summary_json = std::move(m->pg_summary_json);
+  health_json = std::move(m->health_json);
+  mon_status_json = std::move(m->mon_status_json);
+}
+
diff --git a/src/mgr/ClusterState.h b/src/mgr/ClusterState.h
new file mode 100644 (file)
index 0000000..44f6907
--- /dev/null
@@ -0,0 +1,81 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef CLUSTER_STATE_H_
+#define CLUSTER_STATE_H_
+
+#include "mds/FSMap.h"
+#include "common/Mutex.h"
+
+#include "osdc/Objecter.h"
+#include "mon/MonClient.h"
+
+class MMgrDigest;
+
+
+/**
+ * Cluster-scope state (things like cluster maps) as opposed
+ * to daemon-level state (things like perf counters and smart)
+ */
+class ClusterState
+{
+protected:
+  MonClient *monc;
+  Objecter *objecter;
+  FSMap fsmap;
+  Mutex lock;
+
+  bufferlist pg_summary_json;
+  bufferlist health_json;
+  bufferlist mon_status_json;
+
+public:
+
+  void load_digest(MMgrDigest *m);
+
+  const bufferlist &get_pg_summary() const {return pg_summary_json;}
+  const bufferlist &get_health() const {return health_json;}
+  const bufferlist &get_mon_status() const {return mon_status_json;}
+
+  ClusterState(MonClient *monc_, Objecter *objecter_);
+
+  void set_objecter(Objecter *objecter_);
+  void set_fsmap(FSMap const &new_fsmap);
+
+  template<typename Callback, typename...Args>
+  void with_fsmap(Callback&& cb, Args&&...args)
+  {
+  Mutex::Locker l(lock);
+  std::forward<Callback>(cb)(const_cast<const FSMap&>(fsmap),
+      std::forward<Args>(args)...);
+  }
+
+  template<typename Callback, typename...Args>
+  void with_monmap(Callback&& cb, Args&&...args)
+  {
+    Mutex::Locker l(lock);
+    assert(monc != nullptr);
+    monc->with_monmap(cb);
+  }
+
+  template<typename Callback, typename...Args>
+  void with_osdmap(Callback&& cb, Args&&...args)
+  {
+    Mutex::Locker l(lock);
+    assert(objecter != nullptr);
+    objecter->with_osdmap(cb);
+  }
+};
+
+#endif
+
diff --git a/src/mgr/DaemonMetadata.cc b/src/mgr/DaemonMetadata.cc
new file mode 100644 (file)
index 0000000..dc3885c
--- /dev/null
@@ -0,0 +1,132 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "DaemonMetadata.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+void DaemonMetadataIndex::insert(DaemonMetadataPtr dm)
+{
+  if (exists(dm->key)) {
+    _erase(dm->key);
+  }
+
+  by_server[dm->hostname][dm->key] = dm;
+  all[dm->key] = dm;
+}
+
+void DaemonMetadataIndex::_erase(DaemonKey dmk)
+{
+  assert(lock.is_locked_by_me());
+
+  const auto dm = all.at(dmk);
+  auto &server_collection = by_server[dm->hostname];
+  server_collection.erase(dm->key);
+  if (server_collection.empty()) {
+    by_server.erase(dm->hostname);
+  }
+
+  all.erase(dmk);
+}
+
+DaemonMetadataCollection DaemonMetadataIndex::get_by_type(uint8_t type) const
+{
+  DaemonMetadataCollection result;
+
+  for (const auto &i : all) {
+    if (i.first.first == type) {
+      result[i.first] = i.second;
+    }
+  }
+
+  return result;
+}
+
+DaemonMetadataCollection DaemonMetadataIndex::get_by_server(const std::string &hostname) const
+{
+  if (by_server.count(hostname)) {
+    return by_server.at(hostname);
+  } else {
+    return {};
+  }
+}
+
+bool DaemonMetadataIndex::exists(const DaemonKey &key) const
+{
+  return all.count(key) > 0;
+}
+
+DaemonMetadataPtr DaemonMetadataIndex::get(const DaemonKey &key)
+{
+  return all.at(key);
+}
+
+void DaemonMetadataIndex::cull(entity_type_t daemon_type,
+                               std::set<std::string> names_exist)
+{
+  Mutex::Locker l(lock);
+
+  std::set<DaemonKey> victims;
+
+  for (const auto &i : all) {
+    if (i.first.first != daemon_type) {
+      continue;
+    }
+
+    if (names_exist.count(i.first.second) == 0) {
+      victims.insert(i.first);
+    }
+  }
+
+  for (const auto &i : victims) {
+    dout(4) << "Removing data for " << i << dendl;
+    _erase(i);
+  }
+}
+
+void DaemonPerfCounters::update(MMgrReport *report)
+{
+  dout(20) << "loading " << report->declare_types.size() << " new types, "
+           << report->packed.length() << " bytes of data" << dendl;
+
+  // Load any newly declared types
+  for (const auto &t : report->declare_types) {
+    types.insert(std::make_pair(t.path, t));
+    declared_types.insert(t.path);
+  }
+
+  // Parse packed data according to declared set of types
+  bufferlist::iterator p = report->packed.begin();
+  DECODE_START(1, p);
+  for (const auto &t_path : declared_types) {
+    const auto &t = types.at(t_path);
+    uint64_t val = 0;
+    uint64_t avgcount = 0;
+    uint64_t avgcount2 = 0;
+
+    ::decode(val, p);
+    if (t.type & PERFCOUNTER_LONGRUNAVG) {
+      ::decode(avgcount, p);
+      ::decode(avgcount2, p);
+    }
+    // TODO: interface for insertion of avgs, add timestamp
+    instances[t_path].push(val);
+  }
+  // TODO: handle badly encoded things without asserting out
+  DECODE_FINISH(p);
+}
+
+
+
diff --git a/src/mgr/DaemonMetadata.h b/src/mgr/DaemonMetadata.h
new file mode 100644 (file)
index 0000000..4aa406c
--- /dev/null
@@ -0,0 +1,149 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef DAEMON_METADATA_H_
+#define DAEMON_METADATA_H_
+
+// TODO: rename me to DaemonState from DaemonMetadata
+
+#include <map>
+#include <string>
+#include <memory>
+#include <set>
+
+#include "common/Mutex.h"
+
+#include "msg/msg_types.h"
+
+// For PerfCounterType
+#include "messages/MMgrReport.h"
+
+
+// Unique reference to a daemon within a cluster
+typedef std::pair<entity_type_t, std::string> DaemonKey;
+
+// An instance of a performance counter type, within
+// a particular daemon.
+class PerfCounterInstance
+{
+  // TODO: store some short history or whatever
+  uint64_t current;
+  public:
+  void push(uint64_t const &v) {current = v;}
+};
+
+
+typedef std::map<std::string, PerfCounterType> PerfCounterTypes;
+
+// Performance counters for one daemon
+class DaemonPerfCounters
+{
+  public:
+  // The record of perf stat types, shared between daemons
+  PerfCounterTypes &types;
+
+  DaemonPerfCounters(PerfCounterTypes &types_)
+    : types(types_)
+  {}
+
+  std::map<std::string, PerfCounterInstance> instances;
+
+  // FIXME: this state is really local to DaemonServer, it's part
+  // of the protocol rather than being part of what other classes
+  // mgiht want to read.  Maybe have a separate session object
+  // inside DaemonServer instead of stashing session-ish state here?
+  std::set<std::string> declared_types;
+
+  void update(MMgrReport *report);
+};
+
+// The state that we store about one daemon
+class DaemonMetadata
+{
+  public:
+  DaemonKey key;
+
+  // The hostname where daemon was last seen running (extracted
+  // from the metadata)
+  std::string hostname;
+
+  // The metadata (hostname, version, etc) sent from the daemon
+  std::map<std::string, std::string> metadata;
+
+  // The perf counters received in MMgrReport messages
+  DaemonPerfCounters perf_counters;
+
+  DaemonMetadata(PerfCounterTypes &types_)
+    : perf_counters(types_)
+  {
+  }
+};
+
+typedef std::shared_ptr<DaemonMetadata> DaemonMetadataPtr;
+typedef std::map<DaemonKey, DaemonMetadataPtr> DaemonMetadataCollection;
+
+
+
+
+/**
+ * Fuse the collection of per-daemon metadata from Ceph into
+ * a view that can be queried by service type, ID or also
+ * by server (aka fqdn).
+ */
+class DaemonMetadataIndex
+{
+  private:
+  std::map<std::string, DaemonMetadataCollection> by_server;
+  DaemonMetadataCollection all;
+
+  std::set<DaemonKey> updating;
+
+  Mutex lock;
+
+  public:
+
+  DaemonMetadataIndex() : lock("DaemonState") {}
+
+  // FIXME: shouldn't really be public, maybe construct DaemonMetadata
+  // objects internally to avoid this.
+  PerfCounterTypes types;
+
+  void insert(DaemonMetadataPtr dm);
+  void _erase(DaemonKey dmk);
+
+  bool exists(const DaemonKey &key) const;
+  DaemonMetadataPtr get(const DaemonKey &key);
+  DaemonMetadataCollection get_by_server(const std::string &hostname) const;
+  DaemonMetadataCollection get_by_type(uint8_t type) const;
+
+  const DaemonMetadataCollection &get_all() const {return all;}
+  const std::map<std::string, DaemonMetadataCollection> &get_all_servers() const
+  {
+    return by_server;
+  }
+
+  void notify_updating(const DaemonKey &k) { updating.insert(k); }
+  void clear_updating(const DaemonKey &k) { updating.erase(k); }
+  bool is_updating(const DaemonKey &k) { return updating.count(k) > 0; }
+
+  /**
+   * Remove state for all daemons of this type whose names are
+   * not present in `names_exist`.  Use this function when you have
+   * a cluster map and want to ensure that anything absent in the map
+   * is also absent in this class.
+   */
+  void cull(entity_type_t daemon_type, std::set<std::string> names_exist);
+};
+
+#endif
+
diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc
new file mode 100644 (file)
index 0000000..bfd551d
--- /dev/null
@@ -0,0 +1,309 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "DaemonServer.h"
+
+#include "messages/MMgrOpen.h"
+#include "messages/MMgrConfigure.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr.server " << __func__ << " "
+
+DaemonServer::DaemonServer(MonClient *monc_,
+  DaemonMetadataIndex &daemon_state_,
+  PyModules &py_modules_)
+    : Dispatcher(g_ceph_context), msgr(nullptr), monc(monc_),
+      daemon_state(daemon_state_),
+      py_modules(py_modules_),
+      auth_registry(g_ceph_context,
+                    g_conf->auth_supported.empty() ?
+                      g_conf->auth_cluster_required :
+                      g_conf->auth_supported),
+      lock("DaemonServer")
+{}
+
+DaemonServer::~DaemonServer() {
+  delete msgr;
+}
+
+int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
+{
+  // Initialize Messenger
+  msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
+      entity_name_t::MGR(gid), "server", getpid());
+  int r = msgr->bind(g_conf->public_addr);
+  if (r < 0)
+    return r;
+
+  msgr->set_myname(entity_name_t::MGR(gid));
+  msgr->set_addr_unknowns(client_addr);
+
+  msgr->start();
+  msgr->add_dispatcher_tail(this);
+
+  return 0;
+}
+
+entity_addr_t DaemonServer::get_myaddr() const
+{
+  return msgr->get_myaddr();
+}
+
+
+bool DaemonServer::ms_verify_authorizer(Connection *con,
+    int peer_type,
+    int protocol,
+    ceph::bufferlist& authorizer_data,
+    ceph::bufferlist& authorizer_reply,
+    bool& is_valid,
+    CryptoKey& session_key)
+{
+  auto handler = auth_registry.get_handler(protocol);
+  if (!handler) {
+    dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
+    assert(0);
+    is_valid = false;
+    return true;
+  }
+
+  AuthCapsInfo caps_info;
+  EntityName name;
+  uint64_t global_id = 0;
+
+  is_valid = handler->verify_authorizer(cct, monc->rotating_secrets,
+                                                 authorizer_data,
+                                                  authorizer_reply, name,
+                                                  global_id, caps_info,
+                                                  session_key);
+
+  // TODO: invent some caps suitable for ceph-mgr
+
+  return true;
+}
+
+
+bool DaemonServer::ms_get_authorizer(int dest_type,
+    AuthAuthorizer **authorizer, bool force_new)
+{
+  dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
+
+  if (dest_type == CEPH_ENTITY_TYPE_MON) {
+    return true;
+  }
+
+  if (force_new) {
+    if (monc->wait_auth_rotating(10) < 0)
+      return false;
+  }
+
+  *authorizer = monc->auth->build_authorizer(dest_type);
+  dout(20) << "got authorizer " << *authorizer << dendl;
+  return *authorizer != NULL;
+}
+
+
+bool DaemonServer::ms_dispatch(Message *m)
+{
+  Mutex::Locker l(lock);
+
+  switch(m->get_type()) {
+    case MSG_MGR_REPORT:
+      return handle_report(static_cast<MMgrReport*>(m));
+    case MSG_MGR_OPEN:
+      return handle_open(static_cast<MMgrOpen*>(m));
+    case MSG_COMMAND:
+      return handle_command(static_cast<MCommand*>(m));
+    default:
+      dout(1) << "Unhandled message type " << m->get_type() << dendl;
+      return false;
+  };
+}
+
+void DaemonServer::shutdown()
+{
+  msgr->shutdown();
+  msgr->wait();
+}
+
+
+
+bool DaemonServer::handle_open(MMgrOpen *m)
+{
+  DaemonKey key(
+      m->get_connection()->get_peer_type(),
+      m->daemon_name);
+
+  dout(4) << "from " << m->get_connection() << " name "
+          << m->daemon_name << dendl;
+
+  auto configure = new MMgrConfigure();
+  configure->stats_period = 5;
+  m->get_connection()->send_message(configure);
+
+  m->put();
+  return true;
+}
+
+bool DaemonServer::handle_report(MMgrReport *m)
+{
+  DaemonKey key(
+      m->get_connection()->get_peer_type(),
+      m->daemon_name);
+
+  dout(4) << "from " << m->get_connection() << " name "
+          << m->daemon_name << dendl;
+
+  DaemonMetadataPtr daemon;
+  if (daemon_state.exists(key)) {
+    daemon = daemon_state.get(key);
+  } else {
+    daemon = std::make_shared<DaemonMetadata>(daemon_state.types);
+    // FIXME: crap, we don't know the hostname at this stage.
+    daemon->key = key;
+    daemon_state.insert(daemon);
+    // FIXME: we should request metadata at this stage
+  }
+
+  assert(daemon != nullptr);
+  auto daemon_counters = daemon->perf_counters;
+  daemon_counters.update(m);
+  
+  m->put();
+  return true;
+}
+
+struct MgrCommand {
+  string cmdstring;
+  string helpstring;
+  string module;
+  string perm;
+  string availability;
+} mgr_commands[] = {
+
+#define COMMAND(parsesig, helptext, module, perm, availability) \
+  {parsesig, helptext, module, perm, availability},
+
+COMMAND("foo " \
+       "name=bar,type=CephString", \
+       "do a thing", "mgr", "rw", "cli")
+};
+
+bool DaemonServer::handle_command(MCommand *m)
+{
+  int r = 0;
+  std::stringstream ss;
+  std::stringstream ds;
+  bufferlist odata;
+  std::string prefix;
+
+  assert(lock.is_locked_by_me());
+
+  cmdmap_t cmdmap;
+
+  // TODO enforce some caps
+  
+  // TODO background the call into python land so that we don't
+  // block a messenger thread on python code.
+
+  ConnectionRef con = m->get_connection();
+
+  if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
+    r = -EINVAL;
+    goto out;
+  }
+
+  dout(4) << "decoded " << cmdmap.size() << dendl;
+  cmd_getval(cct, cmdmap, "prefix", prefix);
+
+  dout(4) << "prefix=" << prefix << dendl;
+
+  if (prefix == "get_command_descriptions") {
+    int cmdnum = 0;
+
+    dout(10) << "reading commands from python modules" << dendl;
+    auto py_commands = py_modules.get_commands();
+
+    JSONFormatter f;
+    f.open_object_section("command_descriptions");
+    for (const auto &pyc : py_commands) {
+      ostringstream secname;
+      secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
+      dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring
+               << ")" << dendl;
+      dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring,
+                          "mgr", pyc.perm, "cli");
+      cmdnum++;
+    }
+#if 0
+    for (MgrCommand *cp = mgr_commands;
+        cp < &mgr_commands[ARRAY_SIZE(mgr_commands)]; cp++) {
+
+      ostringstream secname;
+      secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
+      dump_cmddesc_to_json(f, secname.str(), cp->cmdstring, cp->helpstring,
+                          cp->module, cp->perm, cp->availability);
+      cmdnum++;
+    }
+#endif
+    f.close_section(); // command_descriptions
+
+    f.flush(ds);
+    goto out;
+  } else {
+    // Let's find you a handler!
+    MgrPyModule *handler = nullptr;
+    auto py_commands = py_modules.get_commands();
+    for (const auto &pyc : py_commands) {
+      auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
+      dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
+      if (pyc_prefix == prefix) {
+        handler = pyc.handler;
+        break;
+      }
+    }
+
+    if (handler == nullptr) {
+      ss << "No handler found for '" << prefix << "'";
+      dout(4) << "No handler found for '" << prefix << "'" << dendl;
+      r = -EINVAL;
+      goto out;
+    }
+
+    // FIXME: go run this python part in another thread, not inline
+    // with a ms_dispatch, so that the python part can block if it
+    // wants to.
+    dout(4) << "passing through " << cmdmap.size() << dendl;
+    r = handler->handle_command(cmdmap, &ds, &ss);
+    goto out;
+  }
+
+ out:
+  std::string rs;
+  rs = ss.str();
+  odata.append(ds);
+  dout(1) << "do_command r=" << r << " " << rs << dendl;
+  //clog->info() << rs << "\n";
+  if (con) {
+    MCommandReply *reply = new MCommandReply(r, rs);
+    reply->set_tid(m->get_tid());
+    reply->set_data(odata);
+    con->send_message(reply);
+  }
+
+  m->put();
+  return true;
+}
+
diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h
new file mode 100644 (file)
index 0000000..ab93574
--- /dev/null
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef DAEMON_SERVER_H_
+#define DAEMON_SERVER_H_
+
+#include "PyModules.h"
+
+#include <set>
+#include <string>
+
+#include "common/Mutex.h"
+
+#include <msg/Messenger.h>
+#include <mon/MonClient.h>
+
+#include "auth/AuthAuthorizeHandler.h"
+
+#include "DaemonMetadata.h"
+
+class MMgrReport;
+class MMgrOpen;
+class MCommand;
+
+
+/**
+ * Server used in ceph-mgr to communicate with Ceph daemons like
+ * MDSs and OSDs.
+ */
+class DaemonServer : public Dispatcher
+{
+protected:
+  Messenger *msgr;
+  MonClient *monc;
+  DaemonMetadataIndex &daemon_state;
+  PyModules &py_modules;
+
+  AuthAuthorizeHandlerRegistry auth_registry;
+
+  Mutex lock;
+
+
+public:
+  int init(uint64_t gid, entity_addr_t client_addr);
+  void shutdown();
+
+  entity_addr_t get_myaddr() const;
+
+  DaemonServer(MonClient *monc_,
+      DaemonMetadataIndex &daemon_state_,
+      PyModules &py_modules_);
+  ~DaemonServer();
+
+  bool ms_dispatch(Message *m);
+  bool ms_handle_reset(Connection *con) { return false; }
+  void ms_handle_remote_reset(Connection *con) {}
+  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+                         bool force_new);
+  bool ms_verify_authorizer(Connection *con,
+      int peer_type,
+      int protocol,
+      ceph::bufferlist& authorizer,
+      ceph::bufferlist& authorizer_reply,
+      bool& isvalid,
+      CryptoKey& session_key);
+
+  bool handle_open(MMgrOpen *m);
+  bool handle_report(MMgrReport *m);
+  bool handle_command(MCommand *m);
+};
+
+#endif
+
diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc
new file mode 100644 (file)
index 0000000..128426c
--- /dev/null
@@ -0,0 +1,576 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "common/errno.h"
+#include "mon/MonClient.h"
+#include "include/stringify.h"
+#include "global/global_context.h"
+
+#include "mgr/MgrContext.h"
+
+#include "MgrPyModule.h"
+#include "DaemonServer.h"
+#include "messages/MMgrBeacon.h"
+#include "messages/MMgrDigest.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+
+#include "Mgr.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+
+Mgr::Mgr() :
+  Dispatcher(g_ceph_context),
+  objecter(NULL),
+  monc(new MonClient(g_ceph_context)),
+  lock("Mgr::lock"),
+  timer(g_ceph_context, lock),
+  finisher(g_ceph_context, "Mgr", "mgr-fin"),
+  waiting_for_fs_map(NULL),
+  py_modules(daemon_state, cluster_state, *monc, finisher),
+  cluster_state(monc, nullptr),
+  server(monc, daemon_state, py_modules)
+{
+  client_messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
+
+  // FIXME: using objecter as convenience to handle incremental
+  // OSD maps, but that's overkill.  We don't really need an objecter.
+  // Could we separate out the part of Objecter that we really need?
+  objecter = new Objecter(g_ceph_context, client_messenger, monc, NULL, 0, 0);
+
+  cluster_state.set_objecter(objecter);
+}
+
+
+Mgr::~Mgr()
+{
+  delete objecter;
+  delete monc;
+  delete client_messenger;
+  assert(waiting_for_fs_map == NULL);
+}
+
+
+/**
+ * Context for completion of metadata mon commands: take
+ * the result and stash it in DaemonMetadataIndex
+ */
+class MetadataUpdate : public Context
+{
+  DaemonMetadataIndex &daemon_state;
+  DaemonKey key;
+
+public:
+  bufferlist outbl;
+  std::string outs;
+
+  MetadataUpdate(DaemonMetadataIndex &daemon_state_, const DaemonKey &key_)
+    : daemon_state(daemon_state_), key(key_) {}
+
+  void finish(int r)
+  {
+    daemon_state.clear_updating(key);
+    if (r == 0) {
+      if (key.first == CEPH_ENTITY_TYPE_MDS) {
+        json_spirit::mValue json_result;
+        bool read_ok = json_spirit::read(
+            outbl.to_str(), json_result);
+        if (!read_ok) {
+          dout(1) << "mon returned invalid JSON for "
+                  << ceph_entity_type_name(key.first)
+                  << "." << key.second << dendl;
+          return;
+        }
+
+
+        json_spirit::mObject daemon_meta = json_result.get_obj();
+        DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+        dm->key = key;
+        dm->hostname = daemon_meta.at("hostname").get_str();
+
+        daemon_meta.erase("name");
+        daemon_meta.erase("hostname");
+
+        for (const auto &i : daemon_meta) {
+          dm->metadata[i.first] = i.second.get_str();
+        }
+
+        daemon_state.insert(dm);
+      } else if (key.first == CEPH_ENTITY_TYPE_OSD) {
+      } else {
+        assert(0);
+      }
+    } else {
+      dout(1) << "mon failed to return metadata for "
+              << ceph_entity_type_name(key.first)
+              << "." << key.second << ": " << cpp_strerror(r) << dendl;
+    }
+  }
+};
+
+
+
+int Mgr::init()
+{
+  Mutex::Locker l(lock);
+
+  // Initialize Messenger
+  int r = client_messenger->bind(g_conf->public_addr);
+  if (r < 0)
+    return r;
+
+  client_messenger->start();
+
+  objecter->set_client_incarnation(0);
+  objecter->init();
+
+  // Connect dispatchers before starting objecter
+  client_messenger->add_dispatcher_tail(objecter);
+  client_messenger->add_dispatcher_tail(this);
+
+  // Initialize MonClient
+  if (monc->build_initial_monmap() < 0) {
+    objecter->shutdown();
+    client_messenger->shutdown();
+    client_messenger->wait();
+    return -1;
+  }
+
+  monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
+      |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
+  monc->set_messenger(client_messenger);
+  monc->init();
+  r = monc->authenticate();
+  if (r < 0) {
+    derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
+    monc->shutdown();
+    objecter->shutdown();
+    client_messenger->shutdown();
+    client_messenger->wait();
+    return r;
+  }
+
+  client_t whoami = monc->get_global_id();
+  client_messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+
+  // Start communicating with daemons to learn statistics etc
+  server.init(monc->get_global_id(), client_messenger->get_myaddr());
+
+  dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
+  // TODO: send the beacon periodically
+  MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(),
+                                 server.get_myaddr());
+  monc->send_mon_message(m);
+
+  // Preload all daemon metadata (will subsequently keep this
+  // up to date by watching maps, so do the initial load before
+  // we subscribe to any maps)
+  dout(4) << "Loading daemon metadata..." << dendl;
+  load_all_metadata();
+
+  // Preload config keys (`get` for plugins is to be a fast local
+  // operation, we we don't have to synchronize these later because
+  // all sets will come via mgr)
+  load_config();
+
+  // Start Objecter and wait for OSD map
+  objecter->start();
+  objecter->wait_for_osd_map();
+  timer.init();
+
+  monc->sub_want("mgrdigest", 0, 0);
+
+  // Prepare to receive FSMap and request it
+  dout(4) << "requesting FSMap..." << dendl;
+  C_SaferCond cond;
+  waiting_for_fs_map = &cond;
+  monc->sub_want("fsmap", 0, 0);
+  monc->renew_subs();
+
+  // Wait for FSMap
+  dout(4) << "waiting for FSMap..." << dendl;
+  lock.Unlock();
+  cond.wait();
+  lock.Lock();
+  waiting_for_fs_map = nullptr;
+  dout(4) << "Got FSMap." << dendl;
+
+  // Wait for MgrDigest...?
+  // TODO
+
+  finisher.start();
+
+  dout(4) << "Complete." << dendl;
+  return 0;
+}
+
+void Mgr::load_all_metadata()
+{
+  assert(lock.is_locked_by_me());
+
+  JSONCommand mds_cmd;
+  mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
+  JSONCommand osd_cmd;
+  osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
+  JSONCommand mon_cmd;
+  mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
+
+  mds_cmd.wait();
+  osd_cmd.wait();
+  mon_cmd.wait();
+
+  assert(mds_cmd.r == 0);
+  assert(mon_cmd.r == 0);
+  assert(osd_cmd.r == 0);
+
+  for (auto &metadata_val : mds_cmd.json_result.get_array()) {
+    json_spirit::mObject daemon_meta = metadata_val.get_obj();
+    if (daemon_meta.count("hostname") == 0) {
+      dout(1) << "Skipping incomplete metadata entry" << dendl;
+      continue;
+    }
+
+    DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+    dm->key = DaemonKey(CEPH_ENTITY_TYPE_MDS,
+                        daemon_meta.at("name").get_str());
+    dm->hostname = daemon_meta.at("hostname").get_str();
+
+    daemon_meta.erase("name");
+    daemon_meta.erase("hostname");
+
+    for (const auto &i : daemon_meta) {
+      dm->metadata[i.first] = i.second.get_str();
+    }
+
+    daemon_state.insert(dm);
+  }
+
+  for (auto &metadata_val : mon_cmd.json_result.get_array()) {
+    json_spirit::mObject daemon_meta = metadata_val.get_obj();
+    if (daemon_meta.count("hostname") == 0) {
+      dout(1) << "Skipping incomplete metadata entry" << dendl;
+      continue;
+    }
+
+    DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+    dm->key = DaemonKey(CEPH_ENTITY_TYPE_MON,
+                        daemon_meta.at("name").get_str());
+    dm->hostname = daemon_meta.at("hostname").get_str();
+
+    daemon_meta.erase("name");
+    daemon_meta.erase("hostname");
+
+    for (const auto &i : daemon_meta) {
+      dm->metadata[i.first] = i.second.get_str();
+    }
+
+    daemon_state.insert(dm);
+  }
+
+  for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
+    json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
+    if (osd_metadata.count("hostname") == 0) {
+      dout(1) << "Skipping incomplete metadata entry" << dendl;
+      continue;
+    }
+    dout(4) << osd_metadata.at("hostname").get_str() << dendl;
+
+    DaemonMetadataPtr dm = std::make_shared<DaemonMetadata>(daemon_state.types);
+    dm->key = DaemonKey(CEPH_ENTITY_TYPE_OSD,
+                        stringify(osd_metadata.at("id").get_int()));
+    dm->hostname = osd_metadata.at("hostname").get_str();
+
+    osd_metadata.erase("id");
+    osd_metadata.erase("hostname");
+
+    for (const auto &i : osd_metadata) {
+      dm->metadata[i.first] = i.second.get_str();
+    }
+
+    daemon_state.insert(dm);
+  }
+}
+
+void Mgr::load_config()
+{
+  assert(lock.is_locked_by_me());
+
+  dout(10) << "listing keys" << dendl;
+  JSONCommand cmd;
+  cmd.run(monc, "{\"prefix\": \"config-key list\"}");
+
+  cmd.wait();
+  assert(cmd.r == 0);
+
+  std::map<std::string, std::string> loaded;
+  
+  for (auto &key_str : cmd.json_result.get_array()) {
+    std::string const key = key_str.get_str();
+    dout(20) << "saw key '" << key << "'" << dendl;
+
+    const std::string config_prefix = PyModules::config_prefix;
+
+    if (key.substr(0, config_prefix.size()) == config_prefix) {
+      dout(20) << "fetching '" << key << "'" << dendl;
+      Command get_cmd;
+      std::ostringstream cmd_json;
+      cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
+      get_cmd.run(monc, cmd_json.str());
+      get_cmd.wait();
+      assert(get_cmd.r == 0);
+
+      loaded[key] = get_cmd.outbl.to_str();
+    }
+  }
+
+  py_modules.insert_config(loaded);
+}
+
+
+void Mgr::shutdown()
+{
+  // First stop the server so that we're not taking any more incoming requests
+  server.shutdown();
+
+  // Then stop the finisher to ensure its enqueued contexts aren't going
+  // to touch references to the things we're about to tear down
+  finisher.stop();
+
+  lock.Lock();
+  timer.shutdown();
+  objecter->shutdown();
+  lock.Unlock();
+
+  monc->shutdown();
+  client_messenger->shutdown();
+  client_messenger->wait();
+}
+
+void Mgr::handle_osd_map()
+{
+  assert(lock.is_locked_by_me());
+
+  std::set<std::string> names_exist;
+
+  /**
+   * When we see a new OSD map, inspect the entity addrs to
+   * see if they have changed (service restart), and if so
+   * reload the metadata.
+   */
+  objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
+    for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) {
+      if (!osd_map.exists(osd_id)) {
+        continue;
+      }
+
+      // Remember which OSDs exist so that we can cull any that don't
+      names_exist.insert(stringify(osd_id));
+
+      // Consider whether to update the daemon metadata (new/restarted daemon)
+      bool update_meta = false;
+      const auto k = DaemonKey(CEPH_ENTITY_TYPE_OSD, stringify(osd_id));
+      if (daemon_state.is_updating(k)) {
+        continue;
+      }
+
+      if (daemon_state.exists(k)) {
+        auto metadata = daemon_state.get(k);
+        auto addr_iter = metadata->metadata.find("front_addr");
+        if (addr_iter != metadata->metadata.end()) {
+          const std::string &metadata_addr = addr_iter->second;
+          const auto &map_addr = osd_map.get_addr(osd_id);
+
+          if (metadata_addr != stringify(map_addr)) {
+            dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
+                    << " != " << stringify(map_addr) << dendl;
+            update_meta = true;
+          } else {
+            dout(20) << "OSD[" << osd_id << "] addr unchanged: "
+                     << metadata_addr << dendl;
+          }
+        } else {
+          // Awkward case where daemon went into DaemonState because it
+          // sent us a report but its metadata didn't get loaded yet
+          update_meta = true;
+        }
+      } else {
+        update_meta = true;
+      }
+
+      if (update_meta) {
+        daemon_state.notify_updating(k);
+        auto c = new MetadataUpdate(daemon_state, k);
+        std::ostringstream cmd;
+        cmd << "{\"prefix\": \"osd metadata\", \"id\": "
+            << osd_id << "}";
+        int r = monc->start_mon_command(
+            {cmd.str()},
+            {}, &c->outbl, &c->outs, c);
+        assert(r == 0);  // start_mon_command defined to not fail
+      }
+    }
+  });
+
+  // TODO: same culling for MonMap and FSMap
+  daemon_state.cull(CEPH_ENTITY_TYPE_OSD, names_exist);
+}
+
+bool Mgr::ms_dispatch(Message *m)
+{
+  derr << *m << dendl;
+  Mutex::Locker l(lock);
+
+  switch (m->get_type()) {
+    case MSG_MGR_DIGEST:
+      handle_mgr_digest(static_cast<MMgrDigest*>(m));
+      break;
+    case CEPH_MSG_MON_MAP:
+      // FIXME: we probably never get called here because MonClient
+      // has consumed the message.  For consuming OSDMap we need
+      // to be the tail dispatcher, but to see MonMap we would
+      // need to be at the head.
+      // Result is that ClusterState has access to monmap (it reads
+      // from monclient anyway), but we don't see notifications.  Hook
+      // into MonClient to get notifications instead of messing
+      // with message delivery to achieve it?
+      assert(0);
+
+      py_modules.notify_all("mon_map", "");
+      break;
+    case CEPH_MSG_FS_MAP:
+      py_modules.notify_all("fs_map", "");
+      handle_fs_map((MFSMap*)m);
+      m->put();
+      break;
+    case CEPH_MSG_OSD_MAP:
+
+      handle_osd_map();
+
+      py_modules.notify_all("osd_map", "");
+
+      // Continuous subscribe, so that we can generate notifications
+      // for our MgrPyModules
+      objecter->maybe_request_map();
+      m->put();
+      break;
+
+    default:
+      return false;
+  }
+  return true;
+}
+
+
+void Mgr::handle_fs_map(MFSMap* m)
+{
+  assert(lock.is_locked_by_me());
+
+  const FSMap &new_fsmap = m->get_fsmap();
+
+  if (waiting_for_fs_map) {
+    waiting_for_fs_map->complete(0);
+    waiting_for_fs_map = NULL;
+  }
+
+  // TODO: callers (e.g. from python land) are potentially going to see
+  // the new fsmap before we've bothered populating all the resulting
+  // daemon_state.  Maybe we should block python land while we're making
+  // this kind of update?
+  
+  cluster_state.set_fsmap(new_fsmap);
+
+  auto mds_info = new_fsmap.get_mds_info();
+  for (const auto &i : mds_info) {
+    const auto &info = i.second;
+
+    const auto k = DaemonKey(CEPH_ENTITY_TYPE_MDS, info.name);
+    if (daemon_state.is_updating(k)) {
+      continue;
+    }
+
+    bool update = false;
+    if (daemon_state.exists(k)) {
+      auto metadata = daemon_state.get(k);
+      // FIXME: nothing stopping old daemons being here, they won't have
+      // addr: need to handle case of pre-ceph-mgr daemons that don't have
+      // the fields we expect
+      auto metadata_addr = metadata->metadata.at("addr");
+      const auto map_addr = info.addr;
+
+      if (metadata_addr != stringify(map_addr)) {
+        dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
+                << " != " << stringify(map_addr) << dendl;
+        update = true;
+      } else {
+        dout(20) << "MDS[" << info.name << "] addr unchanged: "
+                 << metadata_addr << dendl;
+      }
+    } else {
+      update = true;
+    }
+
+    if (update) {
+      daemon_state.notify_updating(k);
+      auto c = new MetadataUpdate(daemon_state, k);
+      std::ostringstream cmd;
+      cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
+          << info.name << "\"}";
+      int r = monc->start_mon_command(
+          {cmd.str()},
+          {}, &c->outbl, &c->outs, c);
+      assert(r == 0);  // start_mon_command defined to not fail
+    }
+  }
+}
+
+
+bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+                         bool force_new)
+{
+  if (dest_type == CEPH_ENTITY_TYPE_MON)
+    return true;
+
+  if (force_new) {
+    if (monc->wait_auth_rotating(10) < 0)
+      return false;
+  }
+
+  *authorizer = monc->auth->build_authorizer(dest_type);
+  return *authorizer != NULL;
+}
+
+
+int Mgr::main(vector<const char *> args)
+{
+  return py_modules.main(args);
+}
+
+
+void Mgr::handle_mgr_digest(MMgrDigest* m)
+{
+  dout(10) << m->mon_status_json.length() << dendl;
+  dout(10) << m->health_json.length() << dendl;
+  cluster_state.load_digest(m);
+  py_modules.notify_all("mon_status", "");
+  py_modules.notify_all("health", "");
+
+  // Hack: use this as a tick/opportunity to prompt python-land that
+  // the pgmap might have changed since last time we were here.
+  py_modules.notify_all("pg_summary", "");
+  
+  m->put();
+}
+
diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h
new file mode 100644 (file)
index 0000000..bc33c6a
--- /dev/null
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef CEPH_PYFOO_H_
+#define CEPH_PYFOO_H_
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+// Python's pyconfig-64.h conflicts with ceph's acconfig.h
+#undef HAVE_SYS_WAIT_H
+#undef HAVE_UNISTD_H
+#undef HAVE_UTIME_H
+#undef _POSIX_C_SOURCE
+#undef _XOPEN_SOURCE
+
+#include "osdc/Objecter.h"
+#include "mds/FSMap.h"
+#include "messages/MFSMap.h"
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+#include "auth/Auth.h"
+#include "common/Finisher.h"
+#include "common/Timer.h"
+
+#include "DaemonServer.h"
+#include "PyModules.h"
+
+#include "DaemonMetadata.h"
+#include "ClusterState.h"
+
+class MCommand;
+class MMgrDigest;
+
+
+class MgrPyModule;
+
+class Mgr : public Dispatcher {
+protected:
+  Objecter  *objecter;
+  MonClient *monc;
+  Messenger *client_messenger;
+
+  Mutex lock;
+  SafeTimer timer;
+  Finisher finisher;
+
+  Context *waiting_for_fs_map;
+
+  PyModules py_modules;
+  DaemonMetadataIndex daemon_state;
+  ClusterState cluster_state;
+
+  DaemonServer server;
+
+  void load_config();
+  void load_all_metadata();
+
+public:
+  Mgr();
+  ~Mgr();
+
+  void handle_mgr_digest(MMgrDigest* m);
+  void handle_fs_map(MFSMap* m);
+  void handle_osd_map();
+  bool ms_dispatch(Message *m);
+  bool ms_handle_reset(Connection *con) { return false; }
+  void ms_handle_remote_reset(Connection *con) {}
+  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+                         bool force_new);
+  int init();
+  void shutdown();
+  void usage() {}
+  int main(vector<const char *> args);
+};
+
+#endif /* MDS_UTILITY_H_ */
diff --git a/src/mgr/MgrContext.h b/src/mgr/MgrContext.h
new file mode 100644 (file)
index 0000000..eae7d75
--- /dev/null
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef MGR_CONTEXT_H_
+#define MGR_CONTEXT_H_
+
+#include <memory>
+#include "include/Context.h"
+
+#include "common/ceph_json.h"
+#include "mon/MonClient.h"
+
+class C_StdFunction : public Context
+{
+private:
+  std::function<void()> fn;
+
+public:
+  C_StdFunction(std::function<void()> fn_)
+    : fn(fn_)
+  {}
+
+  void finish(int r)
+  {
+    fn();
+  }
+};
+
+class Command
+{
+protected:
+  C_SaferCond cond;
+public:
+  bufferlist outbl;
+  std::string outs;
+  int r;
+
+  void run(MonClient *monc, const std::string &command)
+  {
+    monc->start_mon_command({command}, {},
+        &outbl, &outs, &cond);
+  }
+
+  virtual void wait()
+  {
+    r = cond.wait();
+  }
+
+  virtual ~Command() {}
+};
+
+
+class JSONCommand : public Command
+{
+public:
+  json_spirit::mValue json_result;
+
+  void wait()
+  {
+    Command::wait();
+
+    if (r == 0) {
+      bool read_ok = json_spirit::read(
+          outbl.to_str(), json_result);
+      if (!read_ok) {
+        r = -EINVAL;
+      }
+    }
+  }
+};
+
+#endif
+
diff --git a/src/mgr/MgrPyModule.cc b/src/mgr/MgrPyModule.cc
new file mode 100644 (file)
index 0000000..c1dee9f
--- /dev/null
@@ -0,0 +1,201 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#include "PyFormatter.h"
+
+#include "common/debug.h"
+
+#include "MgrPyModule.h"
+
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+MgrPyModule::MgrPyModule(const std::string &module_name_)
+  : module_name(module_name_), pModule(nullptr), pClass(nullptr),
+    pClassInstance(nullptr)
+{}
+
+MgrPyModule::~MgrPyModule()
+{
+  Py_XDECREF(pModule);
+  Py_XDECREF(pClass);
+  Py_XDECREF(pClassInstance);
+}
+
+int MgrPyModule::load()
+{
+  // Load the module
+  PyObject *pName = PyString_FromString(module_name.c_str());
+  pModule = PyImport_Import(pName);
+  Py_DECREF(pName);
+  if (pModule == nullptr) {
+    derr << "Module not found: '" << module_name << "'" << dendl;
+    return -ENOENT;
+  }
+
+  // Find the class
+  // TODO: let them call it what they want instead of just 'Module'
+  pClass = PyObject_GetAttrString(pModule, (const char*)"Module");
+  if (pClass == nullptr) {
+    derr << "Class not found in module '" << module_name << "'" << dendl;
+    return -EINVAL;
+  }
+
+  
+  // Just using the module name as the handle, replace with a
+  // uuidish thing if needed
+  auto pyHandle = PyString_FromString(module_name.c_str());
+  auto pArgs = PyTuple_Pack(1, pyHandle);
+  pClassInstance = PyObject_CallObject(pClass, pArgs);
+  if (pClassInstance == nullptr) {
+    derr << "Failed to construct class in '" << module_name << "'" << dendl;
+    return -EINVAL;
+  } else {
+    dout(1) << "Constructed class from module: " << module_name << dendl;
+  }
+  Py_DECREF(pArgs);
+
+  return load_commands();
+}
+
+int MgrPyModule::serve()
+{
+  assert(pClassInstance != nullptr);
+
+  PyGILState_STATE gstate;
+  gstate = PyGILState_Ensure();
+
+  auto pValue = PyObject_CallMethod(pClassInstance,
+      (const char*)"serve", (const char*)"()");
+
+  if (pValue != NULL) {
+    Py_DECREF(pValue);
+  } else {
+    PyErr_Print();
+    return -EINVAL;
+  }
+
+  PyGILState_Release(gstate);
+
+  return 0;
+}
+
+void MgrPyModule::notify(const std::string &notify_type, const std::string &notify_id)
+{
+  assert(pClassInstance != nullptr);
+
+  PyGILState_STATE gstate;
+  gstate = PyGILState_Ensure();
+
+  // Execute
+  auto pValue = PyObject_CallMethod(pClassInstance, (const char*)"notify", "(ss)",
+       notify_type.c_str(), notify_id.c_str());
+
+  if (pValue != NULL) {
+    Py_DECREF(pValue);
+  } else {
+    PyErr_Print();
+    // FIXME: callers can't be expected to handle a python module
+    // that has spontaneously broken, but Mgr() should provide
+    // a hook to unload misbehaving modules when they have an
+    // error somewhere like this
+  }
+
+  PyGILState_Release(gstate);
+}
+
+int MgrPyModule::load_commands()
+{
+  PyGILState_STATE gstate;
+  gstate = PyGILState_Ensure();
+
+  PyObject *command_list = PyObject_GetAttrString(pClassInstance, "COMMANDS");
+  assert(command_list != nullptr);
+  const size_t list_size = PyList_Size(command_list);
+  for (size_t i = 0; i < list_size; ++i) {
+    PyObject *command = PyList_GetItem(command_list, i);
+    assert(command != nullptr);
+
+    ModuleCommand item;
+
+    PyObject *pCmd = PyDict_GetItemString(command, "cmd");
+    assert(pCmd != nullptr);
+    item.cmdstring = PyString_AsString(pCmd);
+
+    dout(20) << "loaded command " << item.cmdstring << dendl;
+
+    PyObject *pDesc = PyDict_GetItemString(command, "desc");
+    assert(pDesc != nullptr);
+    item.helpstring = PyString_AsString(pDesc);
+
+    PyObject *pPerm = PyDict_GetItemString(command, "perm");
+    assert(pPerm != nullptr);
+    item.perm = PyString_AsString(pPerm);
+
+    item.handler = this;
+
+    commands.push_back(item);
+  }
+  Py_DECREF(command_list);
+
+  PyGILState_Release(gstate);
+
+  dout(10) << "loaded " << commands.size() << " commands" << dendl;
+
+  return 0;
+}
+
+int MgrPyModule::handle_command(
+  const cmdmap_t &cmdmap,
+  std::stringstream *ss,
+  std::stringstream *ds)
+{
+  assert(ss != nullptr);
+  assert(ds != nullptr);
+
+  PyGILState_STATE gstate;
+  gstate = PyGILState_Ensure();
+
+  PyFormatter f;
+  cmdmap_dump(cmdmap, &f);
+  PyObject *py_cmd = f.get();
+
+  auto pResult = PyObject_CallMethod(pClassInstance,
+      (const char*)"handle_command", (const char*)"(O)", py_cmd);
+
+  Py_DECREF(py_cmd);
+
+  int r = 0;
+  if (pResult != NULL) {
+    if (PyTuple_Size(pResult) != 3) {
+      r = -EINVAL;
+    } else {
+      r = PyInt_AsLong(PyTuple_GetItem(pResult, 0));
+      *ds << PyString_AsString(PyTuple_GetItem(pResult, 1));
+      *ss << PyString_AsString(PyTuple_GetItem(pResult, 2));
+    }
+
+    Py_DECREF(pResult);
+  } else {
+    PyErr_Print();
+    r = -EINVAL;
+  }
+
+  PyGILState_Release(gstate);
+
+  return r;
+}
+
diff --git a/src/mgr/MgrPyModule.h b/src/mgr/MgrPyModule.h
new file mode 100644 (file)
index 0000000..8a5dc94
--- /dev/null
@@ -0,0 +1,74 @@
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#ifndef MGR_PY_MODULE_H_
+#define MGR_PY_MODULE_H_
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+
+#include "common/cmdparse.h"
+
+#include <vector>
+#include <string>
+
+
+class MgrPyModule;
+
+/**
+ * A Ceph CLI command description provided from a Python module
+ */
+class ModuleCommand {
+public:
+  std::string cmdstring;
+  std::string helpstring;
+  std::string perm;
+  MgrPyModule *handler;
+};
+
+class MgrPyModule
+{
+private:
+  const std::string module_name;
+  PyObject *pModule;
+  PyObject *pClass;
+  PyObject *pClassInstance;
+
+
+  std::vector<ModuleCommand> commands;
+
+  int load_commands();
+
+public:
+  MgrPyModule(const std::string &module_name);
+  ~MgrPyModule();
+
+  int load();
+  int serve();
+  void notify(const std::string &notify_type, const std::string &notify_id);
+
+  const std::vector<ModuleCommand> &get_commands() const
+  {
+    return commands;
+  }
+
+  int handle_command(
+    const cmdmap_t &cmdmap,
+    std::stringstream *ss,
+    std::stringstream *ds);
+};
+
+#endif
+
diff --git a/src/mgr/PyFormatter.cc b/src/mgr/PyFormatter.cc
new file mode 100644 (file)
index 0000000..55154c7
--- /dev/null
@@ -0,0 +1,123 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat Inc
+ *
+ * Author: John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#include "PyFormatter.h"
+
+
+void PyFormatter::open_array_section(const char *name)
+{
+  PyObject *list = PyList_New(0);
+  dump_pyobject(name, list);
+  stack.push(cursor);
+  cursor = list;
+}
+
+void PyFormatter::open_object_section(const char *name)
+{
+  PyObject *dict = PyDict_New();
+  dump_pyobject(name, dict);
+  stack.push(cursor);
+  cursor = dict;
+}
+
+void PyFormatter::dump_unsigned(const char *name, uint64_t u)
+{
+  PyObject *p = PyLong_FromLongLong(u);
+  assert(p);
+  dump_pyobject(name, p);
+}
+
+void PyFormatter::dump_int(const char *name, int64_t u)
+{
+  PyObject *p = PyLong_FromLongLong(u);
+  assert(p);
+  dump_pyobject(name, p);
+}
+
+void PyFormatter::dump_float(const char *name, double d)
+{
+  dump_pyobject(name, PyFloat_FromDouble(d));
+}
+
+void PyFormatter::dump_string(const char *name, const std::string& s)
+{
+  dump_pyobject(name, PyString_FromString(s.c_str()));
+}
+
+void PyFormatter::dump_bool(const char *name, bool b)
+{
+  if (b) {
+    Py_INCREF(Py_True);
+    dump_pyobject(name, Py_True);
+  } else {
+    Py_INCREF(Py_False);
+    dump_pyobject(name, Py_False);
+  }
+}
+
+std::ostream& PyFormatter::dump_stream(const char *name)
+{
+  // Give the caller an ostream, construct a PyString,
+  // and remember the association between the two.  On flush,
+  // we'll read from the ostream into the PyString
+  auto ps = std::make_shared<PendingStream>();
+  ps->cursor = cursor;
+  ps->name = name;
+
+  pending_streams.push_back(ps);
+
+  return ps->stream;
+}
+
+void PyFormatter::dump_format_va(const char *name, const char *ns, bool quoted, const char *fmt, va_list ap)
+{
+  // TODO
+  assert(0);
+}
+
+/**
+ * Steals reference to `p`
+ */
+void PyFormatter::dump_pyobject(const char *name, PyObject *p)
+{
+  if (PyList_Check(cursor)) {
+    PyList_Append(cursor, p);
+    Py_DECREF(p);
+  } else if (PyDict_Check(cursor)) {
+    PyObject *key = PyString_FromString(name);
+    PyDict_SetItem(cursor, key, p);
+    Py_DECREF(key);
+    Py_DECREF(p);
+  } else {
+    assert(0);
+  }
+}
+
+void PyFormatter::finish_pending_streams()
+{
+  for (const auto &i : pending_streams) {
+    PyObject *tmp_cur = cursor;
+    cursor = i->cursor;
+    dump_pyobject(
+        i->name.c_str(),
+        PyString_FromString(i->stream.str().c_str()));
+    cursor = tmp_cur;
+  }
+
+  pending_streams.clear();
+}
+
diff --git a/src/mgr/PyFormatter.h b/src/mgr/PyFormatter.h
new file mode 100644 (file)
index 0000000..48dd5e6
--- /dev/null
@@ -0,0 +1,142 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat Inc
+ *
+ * Author: John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef PY_FORMATTER_H_
+#define PY_FORMATTER_H_
+
+// Python.h comes first because otherwise it clobbers ceph's assert
+#include "Python.h"
+// Python's pyconfig-64.h conflicts with ceph's acconfig.h
+#undef HAVE_SYS_WAIT_H
+#undef HAVE_UNISTD_H
+#undef HAVE_UTIME_H
+#undef _POSIX_C_SOURCE
+#undef _XOPEN_SOURCE
+
+#include <stack>
+#include <memory>
+#include <list>
+
+#include "common/Formatter.h"
+
+class PyFormatter : public ceph::Formatter
+{
+public:
+  PyFormatter(bool pretty = false, bool array = false)
+  {
+    // Initialise cursor to an empty dict
+    if (!array) {
+      root = cursor = PyDict_New();
+    } else {
+      root = cursor = PyList_New(0);
+    }
+  }
+
+  ~PyFormatter()
+  {
+    cursor = NULL;
+    Py_DECREF(root);
+    root = NULL;
+  }
+
+  // Obscure, don't care.
+  void open_array_section_in_ns(const char *name, const char *ns)
+  {assert(0);}
+  void open_object_section_in_ns(const char *name, const char *ns)
+  {assert(0);}
+
+  void reset()
+  {
+    const bool array = PyList_Check(root);
+    Py_DECREF(root);
+    if (array) {
+      root = cursor = PyList_New(0);
+    } else {
+      root = cursor = PyDict_New();
+    }
+  }
+
+  virtual void set_status(int status, const char* status_name) {}
+  virtual void output_header() {};
+  virtual void output_footer() {};
+
+
+  virtual void open_array_section(const char *name);
+  void open_object_section(const char *name);
+  void close_section()
+  {
+    assert(cursor != root);
+    assert(!stack.empty());
+    cursor = stack.top();
+    stack.pop();
+  }
+  void dump_bool(const char *name, bool b);
+  void dump_unsigned(const char *name, uint64_t u);
+  void dump_int(const char *name, int64_t u);
+  void dump_float(const char *name, double d);
+  void dump_string(const char *name, const std::string& s);
+  std::ostream& dump_stream(const char *name);
+  void dump_format_va(const char *name, const char *ns, bool quoted, const char *fmt, va_list ap);
+
+  void flush(std::ostream& os)
+  {
+      // This class is not a serializer: this doens't make sense
+      assert(0);
+  }
+
+  int get_len() const
+  {
+      // This class is not a serializer: this doens't make sense
+      assert(0);
+      return 0;
+  }
+
+  void write_raw_data(const char *data)
+  {
+      // This class is not a serializer: this doens't make sense
+      assert(0);
+  }
+
+  PyObject *get()
+  {
+    finish_pending_streams();
+
+    Py_INCREF(root);
+    return root;
+  }
+
+  void finish_pending_streams();
+
+private:
+  PyObject *root;
+  PyObject *cursor;
+  std::stack<PyObject *> stack;
+
+  void dump_pyobject(const char *name, PyObject *p);
+
+  class PendingStream {
+    public:
+    PyObject *cursor;
+    std::string name;
+    std::stringstream stream;
+  };
+
+  std::list<std::shared_ptr<PendingStream> > pending_streams;
+
+};
+
+#endif
+
diff --git a/src/mgr/PyModules.cc b/src/mgr/PyModules.cc
new file mode 100644 (file)
index 0000000..82fd2c1
--- /dev/null
@@ -0,0 +1,389 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#include "PyState.h"
+#include "PyFormatter.h"
+
+#include "osd/OSDMap.h"
+#include "mon/MonMap.h"
+
+#include "mgr/MgrContext.h"
+
+#include "PyModules.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+void PyModules::dump_server(const std::string &hostname,
+                      const DaemonMetadataCollection &dmc,
+                      Formatter *f)
+{
+  f->dump_string("hostname", hostname);
+  f->open_array_section("services");
+  std::string ceph_version;
+
+  for (const auto &i : dmc) {
+    const auto &key = i.first;
+    const std::string str_type = ceph_entity_type_name(key.first);
+    const std::string &svc_name = key.second;
+
+    // TODO: pick the highest version, and make sure that
+    // somewhere else (during health reporting?) we are
+    // indicating to the user if we see mixed versions
+    ceph_version = i.second->metadata.at("ceph_version");
+
+    f->open_object_section("service");
+    f->dump_string("type", str_type);
+    f->dump_string("id", svc_name);
+    f->close_section();
+  }
+  f->close_section();
+
+  f->dump_string("ceph_version", ceph_version);
+}
+
+
+
+PyObject *PyModules::get_server_python(const std::string &hostname)
+{
+  PyThreadState *tstate = PyEval_SaveThread();
+  Mutex::Locker l(lock);
+  PyEval_RestoreThread(tstate);
+  dout(10) << " (" << hostname << ")" << dendl;
+
+  auto dmc = daemon_state.get_by_server(hostname);
+
+  PyFormatter f;
+  dump_server(hostname, dmc, &f);
+  return f.get();
+}
+
+
+PyObject *PyModules::list_servers_python()
+{
+  PyThreadState *tstate = PyEval_SaveThread();
+  Mutex::Locker l(lock);
+  PyEval_RestoreThread(tstate);
+  dout(10) << " >" << dendl;
+
+  PyFormatter f(false, true);
+  const auto &all = daemon_state.get_all_servers();
+  for (const auto &i : all) {
+    const auto &hostname = i.first;
+
+    f.open_object_section("server");
+    dump_server(hostname, i.second, &f);
+    f.close_section();
+  }
+
+  return f.get();
+}
+
+PyObject *PyModules::get_metadata_python(std::string const &handle,
+    entity_type_t svc_type, const std::string &svc_id)
+{
+  auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
+  PyFormatter f;
+  f.dump_string("hostname", metadata->hostname);
+  for (const auto &i : metadata->metadata) {
+    f.dump_string(i.first.c_str(), i.second);
+  }
+
+  return f.get();
+}
+
+
+PyObject *PyModules::get_python(const std::string &what)
+{
+  PyThreadState *tstate = PyEval_SaveThread();
+  Mutex::Locker l(lock);
+  PyEval_RestoreThread(tstate);
+
+  if (what == "fs_map") {
+    PyFormatter f;
+    cluster_state.with_fsmap([&f](const FSMap &fsmap) {
+      fsmap.dump(&f);
+    });
+    return f.get();
+  } else if (what == "osdmap_crush_map_text") {
+    bufferlist rdata;
+    cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
+      osd_map.crush->encode(rdata);
+    });
+    std::string crush_text = rdata.to_str();
+    return PyString_FromString(crush_text.c_str());
+  } else if (what.substr(0, 7) == "osd_map") {
+    PyFormatter f;
+    cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
+      if (what == "osd_map") {
+        osd_map.dump(&f);
+      } else if (what == "osd_map_tree") {
+        osd_map.print_tree(&f, nullptr);
+      } else if (what == "osd_map_crush") {
+        osd_map.crush->dump(&f);
+      }
+    });
+    return f.get();
+  } else if (what == "config") {
+    PyFormatter f;
+    g_conf->show_config(&f);
+    return f.get();
+  } else if (what == "mon_map") {
+    PyFormatter f;
+    cluster_state.with_monmap(
+      [&f](const MonMap &monmap) {
+        monmap.dump(&f);
+      }
+    );
+    return f.get();
+  } else if (what == "fs_map") {
+    PyFormatter f;
+    cluster_state.with_fsmap(
+      [&f](const FSMap &fsmap) {
+        fsmap.dump(&f);
+      }
+    );
+    return f.get();
+  } else if (what == "osd_metadata") {
+    PyFormatter f;
+    auto dmc = daemon_state.get_by_type(CEPH_ENTITY_TYPE_OSD);
+    for (const auto &i : dmc) {
+      f.open_object_section(i.first.second.c_str());
+      f.dump_string("hostname", i.second->hostname);
+      for (const auto &j : i.second->metadata) {
+        f.dump_string(j.first.c_str(), j.second);
+      }
+      f.close_section();
+    }
+    return f.get();
+  } else if (what == "pg_summary" || what == "health" || what == "mon_status") {
+    PyFormatter f;
+    bufferlist json;
+    if (what == "pg_summary") {
+      json = cluster_state.get_pg_summary();
+    } else if (what == "health") {
+      json = cluster_state.get_health();
+    } else if (what == "mon_status") {
+      json = cluster_state.get_mon_status();
+    } else {
+      assert(false);
+    }
+    f.dump_string("json", json.to_str());
+    return f.get();
+  } else {
+    derr << "Python module requested unknown data '" << what << "'" << dendl;
+    Py_RETURN_NONE;
+  }
+}
+
+//XXX courtesy of http://stackoverflow.com/questions/1418015/how-to-get-python-exception-text
+#include <boost/python.hpp>
+// decode a Python exception into a string
+std::string handle_pyerror()
+{
+    using namespace boost::python;
+    using namespace boost;
+
+    PyObject *exc,*val,*tb;
+    object formatted_list, formatted;
+    PyErr_Fetch(&exc,&val,&tb);
+    handle<> hexc(exc),hval(allow_null(val)),htb(allow_null(tb)); 
+    object traceback(import("traceback"));
+    if (!tb) {
+        object format_exception_only(traceback.attr("format_exception_only"));
+        formatted_list = format_exception_only(hexc,hval);
+    } else {
+        object format_exception(traceback.attr("format_exception"));
+        formatted_list = format_exception(hexc,hval,htb);
+    }
+    formatted = str("\n").join(formatted_list);
+    return extract<std::string>(formatted);
+}
+
+int PyModules::main(vector<const char *> args)
+{
+  global_handle = this;
+
+  // Set up global python interpreter
+  Py_Initialize();
+
+  // Some python modules do not cope with an unpopulated argv, so lets
+  // fake one.  This step also picks up site-packages into sys.path.
+  const char *argv[] = {"ceph-mgr"};
+  PySys_SetArgv(1, (char**)argv);
+  
+  // Populate python namespace with callable hooks
+  Py_InitModule("ceph_state", CephStateMethods);
+
+  // Configure sys.path to include mgr_module_path
+  const std::string module_path = g_conf->mgr_module_path;
+  dout(4) << "Loading modules from '" << module_path << "'" << dendl;
+  std::string sys_path = Py_GetPath();
+
+  // We need site-packages for flask et al, unless we choose to
+  // embed them in the ceph package.  site-packages is an interpreter-specific
+  // thing, so as an embedded interpreter we're responsible for picking
+  // this.  FIXME: don't hardcode this.
+  std::string site_packages = "/usr/lib/python2.7/site-packages:/usr/lib64/python2.7/site-packages:/usr/lib64/python2.7";
+  sys_path += ":";
+  sys_path += site_packages;
+
+  sys_path += ":";
+  sys_path += module_path;
+  dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
+  PySys_SetPath((char*)(sys_path.c_str()));
+
+  // Let CPython know that we will be calling it back from other
+  // threads in future.
+  if (! PyEval_ThreadsInitialized()) {
+    PyEval_InitThreads();
+  }
+
+  // Load python code
+  // TODO load mgr_modules list, run them all in a thread each.
+  auto mod = new MgrPyModule("rest");
+  int r = mod->load();
+  if (r != 0) {
+    derr << "Error loading python module" << dendl;
+    derr << handle_pyerror() << dendl;
+#if 0
+    PyObject *ptype, *pvalue, *ptraceback;
+    PyErr_Fetch(&ptype, &pvalue, &ptraceback);
+    if (ptype) {
+      if (pvalue) {
+        char *pStrErrorMessage = PyString_AsString(pvalue);
+
+XXX why is pvalue giving null when converted to string?
+
+        assert(pStrErrorMessage != nullptr);
+        derr << "Exception: " << pStrErrorMessage << dendl;
+        Py_DECREF(ptraceback);
+        Py_DECREF(pvalue);
+      }
+      Py_DECREF(ptype);
+    }
+#endif
+
+    // FIXME: be tolerant of bad modules, log an error and continue
+    // to load other, healthy modules.
+    return r;
+  }
+  {
+    Mutex::Locker locker(lock);
+    modules["rest"] = mod;
+  }
+
+  // Execute python server
+  mod->serve();
+
+  {
+    Mutex::Locker locker(lock);
+    // Tear down modules
+    for (auto i : modules) {
+      delete i.second;
+    }
+    modules.clear();
+  }
+
+  Py_Finalize();
+  return 0;
+}
+
+void PyModules::notify_all(const std::string &notify_type,
+                     const std::string &notify_id)
+{
+  Mutex::Locker l(lock);
+
+  dout(10) << __func__ << ": notify_all " << notify_type << dendl;
+  for (auto i : modules) {
+    auto module = i.second;
+    // Send all python calls down a Finisher to avoid blocking
+    // C++ code, and avoid any potential lock cycles.
+    finisher.queue(new C_StdFunction([module, notify_type, notify_id](){
+      module->notify(notify_type, notify_id);
+    }));
+  }
+}
+
+bool PyModules::get_config(const std::string &handle,
+    const std::string &key, std::string *val) const
+{
+  const std::string global_key = config_prefix + handle + "." + key;
+
+  if (config_cache.count(global_key)) {
+    *val = config_cache.at(global_key);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void PyModules::set_config(const std::string &handle,
+    const std::string &key, const std::string &val)
+{
+  const std::string global_key = config_prefix + handle + "." + key;
+
+  config_cache[global_key] = val;
+
+  std::ostringstream cmd_json;
+  Command set_cmd;
+
+  JSONFormatter jf;
+  jf.open_object_section("cmd");
+  jf.dump_string("prefix", "config-key put");
+  jf.dump_string("key", global_key);
+  jf.dump_string("val", val);
+  jf.close_section();
+  jf.flush(cmd_json);
+
+  set_cmd.run(&monc, cmd_json.str());
+  set_cmd.wait();
+
+  // FIXME: is config-key put ever allowed to fail?
+  assert(set_cmd.r == 0);
+}
+
+std::vector<ModuleCommand> PyModules::get_commands()
+{
+  Mutex::Locker l(lock);
+
+  std::vector<ModuleCommand> result;
+  for (auto i : modules) {
+    auto module = i.second;
+    auto mod_commands = module->get_commands();
+    for (auto j : mod_commands) {
+      result.push_back(j);
+    }
+  }
+
+  return result;
+}
+
+void PyModules::insert_config(const std::map<std::string,
+                              std::string> &new_config)
+{
+  dout(4) << "Loaded " << new_config.size() << " config settings" << dendl;
+  config_cache = new_config;
+}
+
+void PyModules::log(const std::string &handle,
+    unsigned level, const std::string &record)
+{
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr[" << handle << "] "
+  dout(level) << record << dendl;
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+}
diff --git a/src/mgr/PyModules.h b/src/mgr/PyModules.h
new file mode 100644 (file)
index 0000000..3020636
--- /dev/null
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef PY_MODULES_H_
+#define PY_MODULES_H_
+
+#include "MgrPyModule.h"
+
+#include "common/Finisher.h"
+#include "common/Mutex.h"
+
+
+#include "DaemonMetadata.h"
+#include "ClusterState.h"
+
+
+
+
+
+class PyModules
+{
+  protected:
+  std::map<std::string, MgrPyModule*> modules;
+
+  DaemonMetadataIndex &daemon_state;
+  ClusterState &cluster_state;
+  MonClient &monc;
+  Finisher &finisher;
+
+  Mutex lock;
+
+public:
+  static constexpr auto config_prefix = "mgr.";
+
+  PyModules(DaemonMetadataIndex &ds, ClusterState &cs, MonClient &mc,
+            Finisher &f)
+    : daemon_state(ds), cluster_state(cs), monc(mc), finisher(f),
+      lock("PyModules")
+  {
+  }
+
+  // FIXME: wrap for send_command?
+  MonClient &get_monc() {return monc;}
+
+  PyObject *get_python(const std::string &what);
+  PyObject *get_server_python(const std::string &hostname);
+  PyObject *list_servers_python();
+  PyObject *get_metadata_python(std::string const &handle,
+      entity_type_t svc_type, const std::string &svc_id);
+
+  std::map<std::string, std::string> config_cache;
+
+  std::vector<ModuleCommand> get_commands();
+
+  void insert_config(const std::map<std::string, std::string> &new_config);
+
+  // Public so that MonCommandCompletion can use it
+  // FIXME: bit weird that we're sending command completions
+  // to all modules (we rely on them to ignore anything that
+  // they don't recognise), but when we get called from
+  // python-land we don't actually know who we are.  Need
+  // to give python-land a handle in initialisation.
+  void notify_all(const std::string &notify_type,
+                  const std::string &notify_id);
+
+  int main(std::vector<const char *> args);
+
+  void dump_server(const std::string &hostname,
+                   const DaemonMetadataCollection &dmc,
+                   Formatter *f);
+
+  bool get_config(const std::string &handle,
+      const std::string &key, std::string *val) const;
+  void set_config(const std::string &handle,
+      const std::string &key, const std::string &val);
+
+  void log(const std::string &handle,
+           unsigned level, const std::string &record);
+};
+
+#endif
+
diff --git a/src/mgr/PyState.cc b/src/mgr/PyState.cc
new file mode 100644 (file)
index 0000000..6e1534d
--- /dev/null
@@ -0,0 +1,235 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+/**
+ * The interface we present to python code that runs within
+ * ceph-mgr.
+ */
+
+#include "Mgr.h"
+
+#include "mon/MonClient.h"
+
+#include "PyState.h"
+
+PyModules *global_handle = NULL;
+
+
+class MonCommandCompletion : public Context
+{
+  PyObject *python_completion;
+  const std::string tag;
+
+public:
+  std::string outs;
+  bufferlist outbl;
+
+  MonCommandCompletion(PyObject* ev, const std::string &tag_)
+    : python_completion(ev), tag(tag_)
+  {
+    assert(python_completion != nullptr);
+    Py_INCREF(python_completion);
+  }
+
+  ~MonCommandCompletion()
+  {
+    Py_DECREF(python_completion);
+  }
+
+  void finish(int r)
+  {
+    PyGILState_STATE gstate;
+    gstate = PyGILState_Ensure();
+
+    auto set_fn = PyObject_GetAttrString(python_completion, "complete");
+    assert(set_fn != nullptr);
+
+    auto pyR = PyInt_FromLong(r);
+    auto pyOutBl = PyString_FromString(outbl.to_str().c_str());
+    auto pyOutS = PyString_FromString(outs.c_str());
+    auto args = PyTuple_Pack(3, pyR, pyOutBl, pyOutS);
+    Py_DECREF(pyR);
+    Py_DECREF(pyOutBl);
+    Py_DECREF(pyOutS);
+
+    auto rtn = PyObject_CallObject(set_fn, args);
+    if (rtn != nullptr) {
+      Py_DECREF(rtn);
+    }
+    Py_DECREF(args);
+
+    PyGILState_Release(gstate);
+
+    global_handle->notify_all("command", tag);
+  }
+};
+
+
+static PyObject*
+ceph_send_command(PyObject *self, PyObject *args)
+{
+  char *handle = nullptr;
+  char *cmd_json = nullptr;
+  char *tag = nullptr;
+  PyObject *completion = nullptr;
+  if (!PyArg_ParseTuple(args, "sOss:ceph_send_command",
+        &handle, &completion, &cmd_json, &tag)) {
+    return nullptr;
+  }
+
+  auto set_fn = PyObject_GetAttrString(completion, "complete");
+  if (set_fn == nullptr) {
+    assert(0);  // TODO raise python exception instead
+  } else {
+    assert(PyCallable_Check(set_fn));
+  }
+  Py_DECREF(set_fn);
+
+  auto c = new MonCommandCompletion(completion, tag);
+  auto r = global_handle->get_monc().start_mon_command(
+      {cmd_json},
+      {},
+      &c->outbl,
+      &c->outs,
+      c);
+  assert(r == 0);  // start_mon_command is forbidden to fail
+
+  Py_RETURN_NONE;
+}
+
+
+static PyObject*
+ceph_state_get(PyObject *self, PyObject *args)
+{
+  char *handle = nullptr;
+  char *what = NULL;
+  if (!PyArg_ParseTuple(args, "ss:ceph_state_get", &handle, &what)) {
+    return NULL;
+  }
+
+  return global_handle->get_python(what);
+}
+
+
+static PyObject*
+ceph_get_server(PyObject *self, PyObject *args)
+{
+  char *handle = nullptr;
+  char *hostname = NULL;
+  if (!PyArg_ParseTuple(args, "sz:ceph_get_server", &handle, &hostname)) {
+    return NULL;
+  }
+
+  if (hostname) {
+    return global_handle->get_server_python(hostname);
+  } else {
+    return global_handle->list_servers_python();
+  }
+}
+
+static PyObject*
+ceph_config_get(PyObject *self, PyObject *args)
+{
+  char *handle = nullptr;
+  char *what = nullptr;
+  if (!PyArg_ParseTuple(args, "ss:ceph_config_get", &handle, &what)) {
+    derr << "Invalid args!" << dendl;
+    return nullptr;
+  }
+
+  std::string value;
+  bool found = global_handle->get_config(handle, what, &value);
+  if (found) {
+    derr << "Found" << dendl;
+    return PyString_FromString(value.c_str());
+  } else {
+    derr << "Not found" << dendl;
+    Py_RETURN_NONE;
+  }
+}
+
+static PyObject*
+ceph_config_set(PyObject *self, PyObject *args)
+{
+  char *handle = nullptr;
+  char *key = nullptr;
+  char *value = nullptr;
+  if (!PyArg_ParseTuple(args, "sss:ceph_config_set", &handle, &key, &value)) {
+    return nullptr;
+  }
+
+  global_handle->set_config(handle, key, value);
+
+  Py_RETURN_NONE;
+}
+
+static PyObject*
+get_metadata(PyObject *self, PyObject *args)
+{
+  char *handle = nullptr;
+  char *type_str = NULL;
+  char *svc_id = NULL;
+  if (!PyArg_ParseTuple(args, "sss:get_metadata", &handle, &type_str, &svc_id)) {
+    return nullptr;
+  }
+
+  entity_type_t svc_type;
+  if (type_str == std::string("mds")) {
+    svc_type = CEPH_ENTITY_TYPE_MDS;
+  } else if (type_str == std::string("osd")) {
+    svc_type = CEPH_ENTITY_TYPE_OSD;
+  } else if (type_str == std::string("mon")) {
+    svc_type = CEPH_ENTITY_TYPE_MON;
+  } else {
+    // FIXME: form a proper exception
+    return nullptr;
+  }
+
+  return global_handle->get_metadata_python(handle, svc_type, svc_id);
+}
+
+static PyObject*
+ceph_log(PyObject *self, PyObject *args)
+{
+  int level = 0;
+  char *record = nullptr;
+  char *handle = nullptr;
+  if (!PyArg_ParseTuple(args, "sis:log", &handle, &level, &record)) {
+    return nullptr;
+  }
+
+  global_handle->log(handle, level, record);
+
+  Py_RETURN_NONE;
+}
+
+
+
+PyMethodDef CephStateMethods[] = {
+    {"get", ceph_state_get, METH_VARARGS,
+     "Get a cluster object"},
+    {"get_server", ceph_get_server, METH_VARARGS,
+     "Get a server object"},
+    {"get_metadata", get_metadata, METH_VARARGS,
+     "Get a service's metadata"},
+    {"send_command", ceph_send_command, METH_VARARGS,
+     "Send a mon command"},
+    {"get_config", ceph_config_get, METH_VARARGS,
+     "Get a configuration value"},
+    {"set_config", ceph_config_set, METH_VARARGS,
+     "Set a configuration value"},
+    {"log", ceph_log, METH_VARARGS,
+     "Emit a (local) log message"},
+    {NULL, NULL, 0, NULL}
+};
+
diff --git a/src/mgr/PyState.h b/src/mgr/PyState.h
new file mode 100644 (file)
index 0000000..e53296b
--- /dev/null
@@ -0,0 +1,12 @@
+#ifndef PYSTATE_H_
+#define PYSTATE_H_
+
+#include "Python.h"
+
+class PyModules;
+
+extern PyModules *global_handle;
+extern PyMethodDef CephStateMethods[];
+
+#endif
+