]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: flesh out standby/HA
authorJohn Spray <john.spray@redhat.com>
Thu, 14 Jul 2016 23:39:22 +0000 (00:39 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 29 Sep 2016 16:26:58 +0000 (17:26 +0100)
Signed-off-by: John Spray <john.spray@redhat.com>
16 files changed:
src/CMakeLists.txt
src/ceph_mgr.cc
src/common/config_opts.h
src/messages/MMgrBeacon.h
src/mgr/Mgr.cc
src/mgr/Mgr.h
src/mgr/MgrClient.cc
src/mgr/MgrClient.h
src/mgr/MgrStandby.cc [new file with mode: 0644]
src/mgr/MgrStandby.h [new file with mode: 0644]
src/mgr/PyModules.cc
src/mon/MgrMap.h
src/mon/MgrMonitor.cc
src/mon/MgrMonitor.h
src/mon/MonCommands.h
src/mon/Monitor.cc

index 5a73b26053fbd272d7a501393fba9883901749be..92a19e6295d7f9fd1847a1442a3ade5ad4f73a3e 100644 (file)
@@ -524,6 +524,7 @@ if (WITH_MGR)
       mgr/PyFormatter.cc
       mgr/PyState.cc
       mgr/MgrPyModule.cc
+      mgr/MgrStandby.cc
       mgr/Mgr.cc)
   add_executable(ceph-mgr ${mgr_srcs}
                  $<TARGET_OBJECTS:heap_profiler_objs>)
index 4b4c62a61b40cfed937e33ac3a9be29355f1cb74..c3d032c132685c0f040d82fe22475cbbd6d99edb 100644 (file)
@@ -14,7 +14,6 @@
  *
  */
 
-#include "mgr/Mgr.h"
 
 #include "include/types.h"
 #include "common/config.h"
 #include "common/errno.h"
 #include "global/global_init.h"
 
+#include "mgr/MgrStandby.h"
 
 
-
+/**
+ * A short main() which just instantiates a MgrStandby and
+ * hands over control to that.
+ */
 int main(int argc, const char **argv)
 {
   vector<const char*> args;
@@ -33,13 +36,12 @@ int main(int argc, const char **argv)
 
   global_init(NULL, args, CEPH_ENTITY_TYPE_MGR, CODE_ENVIRONMENT_DAEMON, 0,
               "mgr_data");
-  common_init_finish(g_ceph_context);
   // For consumption by KeyRing::from_ceph_context in MonClient
   g_conf->set_val("keyring", "$mgr_data/keyring", false);
 
-  Mgr mgr;
+  MgrStandby mgr;
 
-  // Handle --help before calling init() so we don't depend on network.
+  // Handle --help
   if ((args.size() == 1 && (std::string(args[0]) == "--help" || std::string(args[0]) == "-h"))) {
     mgr.usage();
     return 0;
@@ -49,14 +51,12 @@ int main(int argc, const char **argv)
   global_init_chdir(g_ceph_context);
   common_init_finish(g_ceph_context);
 
-  // Connect to mon cluster, download MDS map etc
   int rc = mgr.init();
   if (rc != 0) {
       std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl;
       return rc;
   }
 
-  // Finally, execute the user's commands
   return mgr.main(args);
 }
 
index 4f625db8174ac576759255230df1876e6472e81c..ed77cb093741f2f4ff212e2fff6564f1c1cb8c90 100644 (file)
@@ -1504,6 +1504,8 @@ OPTION(rgw_swift_versioning_enabled, OPT_BOOL, false) // whether swift object ve
 OPTION(mgr_module_path, OPT_STR, CEPH_PKGLIBDIR "/mgr") // where to load python modules from
 OPTION(mgr_modules, OPT_STR, "rest")  // Which modules to load
 OPTION(mgr_data, OPT_STR, "/var/lib/ceph/mgr/$cluster-$id") // where to find keyring etc
+OPTION(mgr_beacon_period, OPT_INT, 5)  // How frequently to send beacon
+OPTION(mon_mgr_beacon_grace, OPT_INT, 30)  // How long to wait to failover
 
 OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entries to read from rados for bucket listing
 
index e38227e24217fb14bbbac732c518320cf5349dad..b452a9dda7765f82f229d0ef818d58b12158a61d 100644 (file)
@@ -28,21 +28,27 @@ class MMgrBeacon : public PaxosServiceMessage {
 protected:
   uint64_t gid;
   entity_addr_t server_addr;
+  bool available;
+  std::string name;
 
 public:
   MMgrBeacon()
-    : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION)
+    : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION),
+      gid(0), available(false)
   {
   }
 
-  MMgrBeacon(uint64_t gid_, entity_addr_t server_addr_)
+  MMgrBeacon(uint64_t gid_, const std::string &name_,
+             entity_addr_t server_addr_, bool available_)
     : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION),
-      gid(gid_), server_addr(server_addr_)
+      gid(gid_), server_addr(server_addr_), available(available_), name(name_)
   {
   }
 
   uint64_t get_gid() const { return gid; }
   entity_addr_t get_server_addr() const { return server_addr; }
+  bool get_available() const { return available; }
+  const std::string& get_name() const { return name; }
 
 private:
   ~MMgrBeacon() {}
@@ -52,17 +58,24 @@ public:
   const char *get_type_name() const { return "mgrbeacon"; }
 
   void print(ostream& out) const {
-    out << get_type_name() << "(" << gid << ", " << server_addr << ")";
+    out << get_type_name() << " mgr." << name << "(" << gid << ", "
+        << server_addr << ", " << available << ")";
   }
 
   void encode_payload(uint64_t features) {
     paxos_encode();
     ::encode(server_addr, payload, features);
+    ::encode(gid, payload);
+    ::encode(available, payload);
+    ::encode(name, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     paxos_decode(p);
     ::decode(server_addr, p);
+    ::decode(gid, p);
+    ::decode(available, p);
+    ::decode(name, p);
   }
 };
 
index 751979993687ebf736aede9b2a2c3f6732d1f455..bcc27855296cbbf57e62d3e7ae83793b5cb59a3d 100644 (file)
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 
 
-Mgr::Mgr() :
-  Dispatcher(g_ceph_context),
+Mgr::Mgr(MonClient *monc_, Messenger *clientm_) :
+  monc(monc_),
   objecter(NULL),
-  monc(new MonClient(g_ceph_context)),
+  client_messenger(clientm_),
   lock("Mgr::lock"),
   timer(g_ceph_context, lock),
   finisher(g_ceph_context, "Mgr", "mgr-fin"),
   waiting_for_fs_map(NULL),
   py_modules(daemon_state, cluster_state, *monc, finisher),
   cluster_state(monc, nullptr),
-  server(monc, daemon_state, py_modules)
+  server(monc, daemon_state, py_modules),
+  initialized(false),
+  initializing(false)
 {
-  client_messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
-
-  // FIXME: using objecter as convenience to handle incremental
-  // OSD maps, but that's overkill.  We don't really need an objecter.
-  // Could we separate out the part of Objecter that we really need?
+  // Using Objecter to handle incremental decode of OSDMap
   objecter = new Objecter(g_ceph_context, client_messenger, monc, NULL, 0, 0);
 
   cluster_state.set_objecter(objecter);
@@ -59,9 +57,7 @@ Mgr::Mgr() :
 Mgr::~Mgr()
 {
   delete objecter;
-  delete monc;
-  delete client_messenger;
-  assert(waiting_for_fs_map == NULL);
+  assert(waiting_for_fs_map == nullptr);
 }
 
 
@@ -123,58 +119,35 @@ public:
 };
 
 
-
-int Mgr::init()
+void Mgr::background_init()
 {
   Mutex::Locker l(lock);
+  assert(!initializing);
+  assert(!initialized);
+  initializing = true;
 
-  // Initialize Messenger
-  int r = client_messenger->bind(g_conf->public_addr);
-  if (r < 0)
-    return r;
+  finisher.start();
 
-  client_messenger->start();
+  finisher.queue(new C_StdFunction([this](){
+    init();
+  }));
+}
+
+void Mgr::init()
+{
+  Mutex::Locker l(lock);
+  assert(initializing);
+  assert(!initialized);
 
   objecter->set_client_incarnation(0);
   objecter->init();
 
-  // Connect dispatchers before starting objecter
-  client_messenger->add_dispatcher_tail(objecter);
-  client_messenger->add_dispatcher_tail(this);
-
-  // Initialize MonClient
-  if (monc->build_initial_monmap() < 0) {
-    objecter->shutdown();
-    client_messenger->shutdown();
-    client_messenger->wait();
-    return -1;
-  }
-
-  monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
-      |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
-  monc->set_messenger(client_messenger);
-  monc->init();
-  r = monc->authenticate();
-  if (r < 0) {
-    derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
-    monc->shutdown();
-    objecter->shutdown();
-    client_messenger->shutdown();
-    client_messenger->wait();
-    return r;
-  }
-
-  client_t whoami = monc->get_global_id();
-  client_messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+  // Dispatcher before starting objecter
+  client_messenger->add_dispatcher_head(objecter);
 
   // Start communicating with daemons to learn statistics etc
   server.init(monc->get_global_id(), client_messenger->get_myaddr());
-
   dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
-  // TODO: send the beacon periodically
-  MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(),
-                                 server.get_myaddr());
-  monc->send_mon_message(m);
 
   // Preload all daemon metadata (will subsequently keep this
   // up to date by watching maps, so do the initial load before
@@ -189,8 +162,9 @@ int Mgr::init()
 
   // Start Objecter and wait for OSD map
   objecter->start();
+  lock.Unlock();  // Drop lock because OSDMap dispatch calls into my ms_dispatch
   objecter->wait_for_osd_map();
-  timer.init();
+  lock.Lock();
 
   monc->sub_want("mgrdigest", 0, 0);
 
@@ -212,12 +186,14 @@ int Mgr::init()
   // Wait for MgrDigest...?
   // TODO
 
-  finisher.start();
+  // assume finisher already initialized in background_init
 
   py_modules.init();
+  py_modules.start();
 
   dout(4) << "Complete." << dendl;
-  return 0;
+  initializing = false;
+  initialized = true;
 }
 
 void Mgr::load_all_metadata()
@@ -342,15 +318,12 @@ void Mgr::load_config()
   py_modules.insert_config(loaded);
 }
 
-void Mgr::handle_signal(int signum)
+void Mgr::shutdown()
 {
+  // FIXME: pre-empt init() if it is currently running, so that it will
+  // give up the lock for us.
   Mutex::Locker l(lock);
-  assert(signum == SIGINT || signum == SIGTERM);
-  shutdown();
-}
 
-void Mgr::shutdown()
-{
   // First stop the server so that we're not taking any more incoming requests
   server.shutdown();
 
@@ -358,13 +331,7 @@ void Mgr::shutdown()
   // to touch references to the things we're about to tear down
   finisher.stop();
 
-  //lock.Lock();
-  timer.shutdown();
   objecter->shutdown();
-  //lock.Unlock();
-
-  monc->shutdown();
-  client_messenger->shutdown();
 }
 
 void Mgr::handle_osd_map()
@@ -545,51 +512,6 @@ void Mgr::handle_fs_map(MFSMap* m)
 }
 
 
-bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
-                         bool force_new)
-{
-  if (dest_type == CEPH_ENTITY_TYPE_MON)
-    return true;
-
-  if (force_new) {
-    if (monc->wait_auth_rotating(10) < 0)
-      return false;
-  }
-
-  *authorizer = monc->auth->build_authorizer(dest_type);
-  return *authorizer != NULL;
-}
-
-// A reference for use by the signal handler
-Mgr *signal_mgr = nullptr;
-
-static void handle_mgr_signal(int signum)
-{
-  if (signal_mgr) {
-    signal_mgr->handle_signal(signum);
-  }
-}
-
-int Mgr::main(vector<const char *> args)
-{
-  py_modules.start();
-
-  // Enable signal handlers
-  signal_mgr = this;
-  init_async_signal_handler();
-  register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
-  register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
-
-  client_messenger->wait();
-
-  // Disable signal handlers
-  unregister_async_signal_handler(SIGINT, handle_mgr_signal);
-  unregister_async_signal_handler(SIGTERM, handle_mgr_signal);
-  shutdown_async_signal_handler();
-  signal_mgr = nullptr;
-}
-
-
 void Mgr::handle_mgr_digest(MMgrDigest* m)
 {
   dout(10) << m->mon_status_json.length() << dendl;
index e85358f06b7d63aa67ee1b87c89197991a221716..825edf153335357aae1c6aaaadbb8808a6245544 100644 (file)
@@ -11,8 +11,8 @@
  * Foundation.  See file COPYING.
  */
 
-#ifndef CEPH_PYFOO_H_
-#define CEPH_PYFOO_H_
+#ifndef CEPH_MGR_H_
+#define CEPH_MGR_H_
 
 // Python.h comes first because otherwise it clobbers ceph's assert
 #include "Python.h"
@@ -26,7 +26,6 @@
 #include "osdc/Objecter.h"
 #include "mds/FSMap.h"
 #include "messages/MFSMap.h"
-#include "msg/Dispatcher.h"
 #include "msg/Messenger.h"
 #include "auth/Auth.h"
 #include "common/Finisher.h"
@@ -44,10 +43,10 @@ class MMgrDigest;
 
 class MgrPyModule;
 
-class Mgr : public Dispatcher {
+class Mgr {
 protected:
-  Objecter  *objecter;
   MonClient *monc;
+  Objecter  *objecter;
   Messenger *client_messenger;
 
   Mutex lock;
@@ -64,24 +63,27 @@ protected:
 
   void load_config();
   void load_all_metadata();
+  void init();
+
+  bool initialized;
+  bool initializing;
 
 public:
-  Mgr();
+  Mgr(MonClient *monc_, Messenger *clientm_);
   ~Mgr();
 
+  bool is_initialized() const {return initialized;}
+  entity_addr_t get_server_addr() const { return server.get_myaddr(); }
+
   void handle_mgr_digest(MMgrDigest* m);
   void handle_fs_map(MFSMap* m);
   void handle_osd_map();
+
   bool ms_dispatch(Message *m);
-  bool ms_handle_reset(Connection *con) { return false; }
-  void ms_handle_remote_reset(Connection *con) {}
-  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
-                         bool force_new);
-  int init();
+
+  void background_init();
   void shutdown();
-  void usage() {}
   int main(vector<const char *> args);
-  void handle_signal(int signum);
 };
 
-#endif /* MDS_UTILITY_H_ */
+#endif
index 2fbbc8048d158da26e3458767afd7e63b252ead0..8af29a52222a378c3394aa88377a1cda184d5d7b 100644 (file)
@@ -44,14 +44,6 @@ void MgrClient::init()
   assert(msgr != nullptr);
 
   timer.init();
-
-#if 0
-  if (map.epoch == 0) {
-    ldout(cct, 4) << "no map yet, waiting..." << dendl;
-    wait_on_list(waiting_for_map);
-  }
-  ldout(cct, 4) << "proceeding with map " << map.epoch << dendl;
-#endif
 }
 
 void MgrClient::shutdown()
@@ -98,38 +90,50 @@ bool MgrClient::handle_mgr_map(MMgrMap *m)
   if (session == nullptr || 
       session->con->get_peer_addr() != map.get_active_addr()) {
 
-    entity_inst_t inst;
-    inst.addr = map.get_active_addr();
-    inst.name = entity_name_t::MGR(map.get_active_gid());
-
-    delete session;
-    session = new MgrSessionState();
-    session->con = msgr->get_connection(inst);
-
-    // Don't send an open if we're just a client (i.e. doing
-    // command-sending, not stats etc)
-    if (g_conf && !g_conf->name.is_client()) {
-      auto open = new MMgrOpen();
-      open->daemon_name = g_conf->name.get_id();
-      session->con->send_message(open);
+    if (session) {
+      ldout(cct, 4) << "Terminating session with "
+                    << session->con->get_peer_addr() << dendl;
+      delete session;
+      session = nullptr;
+
+      std::vector<ceph_tid_t> erase_cmds;
+      auto commands = command_table.get_commands();
+      for (const auto &i : commands) {
+        // FIXME be nicer, retarget command on new mgr?
+        if (i.second->on_finish != nullptr) {
+          i.second->on_finish->complete(-ETIMEDOUT);
+        }
+        erase_cmds.push_back(i.first);
+      }
+      for (const auto &tid : erase_cmds) {
+        command_table.erase(tid);
+      }
     }
 
-    std::vector<ceph_tid_t> erase_cmds;
-    auto commands = command_table.get_commands();
-    for (const auto &i : commands) {
-      // FIXME be nicer, retarget command on new mgr?
-      if (i.second->on_finish != nullptr) {
-        i.second->on_finish->complete(-ETIMEDOUT);
+    if (map.get_available()) {
+      ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
+                    << dendl;
+      entity_inst_t inst;
+      inst.addr = map.get_active_addr();
+      inst.name = entity_name_t::MGR(map.get_active_gid());
+
+      session = new MgrSessionState();
+      session->con = msgr->get_connection(inst);
+
+      // Don't send an open if we're just a client (i.e. doing
+      // command-sending, not stats etc)
+      if (g_conf && !g_conf->name.is_client()) {
+        auto open = new MMgrOpen();
+        open->daemon_name = g_conf->name.get_id();
+        session->con->send_message(open);
       }
-      erase_cmds.push_back(i.first);
-    }
-    for (const auto &tid : erase_cmds) {
-      command_table.erase(tid);
+
+      signal_cond_list(waiting_for_session);
+    } else {
+      ldout(cct, 4) << "No active mgr available yet" << dendl;
     }
   }
 
-  signal_cond_list(waiting_for_map);
-
   return true;
 }
 
@@ -262,16 +266,13 @@ int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
 
   ldout(cct, 20) << "cmd: " << cmd << dendl;
 
-  assert(map.epoch > 0);
-
-
-
   if (session == nullptr) {
-    derr << "no session" << dendl;
-    // FIXME: be nicer: maybe block until a mgr is available?
-    return -ENOENT;
+    derr << "no session, waiting" << dendl;
+    wait_on_list(waiting_for_session);
   }
 
+  assert(map.epoch > 0);
+
   MgrCommand *op = command_table.start_command();
   op->cmd = cmd;
   op->inbl = inbl;
index bcaf3de5ea5cb8f2952a0d5f55fdc9dfd2fc08c9..92775d62e3b1a833e7ab054a7ea47044555d058b 100644 (file)
@@ -63,7 +63,8 @@ protected:
 
   void wait_on_list(list<Cond*>& ls);
   void signal_cond_list(list<Cond*>& ls);
-  list<Cond*> waiting_for_map;
+
+  list<Cond*> waiting_for_session;
 
 public:
   MgrClient(CephContext *cct_, Messenger *msgr_);
diff --git a/src/mgr/MgrStandby.cc b/src/mgr/MgrStandby.cc
new file mode 100644 (file)
index 0000000..896fb16
--- /dev/null
@@ -0,0 +1,230 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "common/errno.h"
+#include "mon/MonClient.h"
+#include "include/stringify.h"
+#include "global/global_context.h"
+#include "global/signal_handler.h"
+
+#include "mgr/MgrContext.h"
+
+#include "messages/MMgrBeacon.h"
+#include "messages/MMgrMap.h"
+#include "Mgr.h"
+
+#include "MgrStandby.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+
+MgrStandby::MgrStandby() :
+  Dispatcher(g_ceph_context),
+  monc(new MonClient(g_ceph_context)),
+  lock("MgrStandby::lock"),
+  timer(g_ceph_context, lock),
+  active_mgr(nullptr)
+{
+  client_messenger = Messenger::create_client_messenger(g_ceph_context, "mgr");
+}
+
+
+MgrStandby::~MgrStandby()
+{
+  delete monc;
+  delete client_messenger;
+  delete active_mgr;
+}
+
+
+int MgrStandby::init()
+{
+  Mutex::Locker l(lock);
+
+  // Initialize Messenger
+  client_messenger->add_dispatcher_tail(this);
+  client_messenger->start();
+
+  // Initialize MonClient
+  if (monc->build_initial_monmap() < 0) {
+    client_messenger->shutdown();
+    client_messenger->wait();
+    return -1;
+  }
+
+  monc->sub_want("mgrmap", 0, 0);
+
+  monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
+      |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
+  monc->set_messenger(client_messenger);
+  monc->init();
+  int r = monc->authenticate();
+  if (r < 0) {
+    derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
+    monc->shutdown();
+    client_messenger->shutdown();
+    client_messenger->wait();
+    return r;
+  }
+
+  client_t whoami = monc->get_global_id();
+  client_messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+
+  timer.init();
+  send_beacon();
+
+  dout(4) << "Complete." << dendl;
+  return 0;
+}
+
+void MgrStandby::send_beacon()
+{
+  assert(lock.is_locked_by_me());
+  dout(1) << state_str() << dendl;
+  dout(10) << "sending beacon as gid " << monc->get_global_id() << dendl;
+
+  bool available = active_mgr != nullptr && active_mgr->is_initialized();
+  auto addr = available ? active_mgr->get_server_addr() : entity_addr_t();
+  MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(),
+                                 g_conf->name.get_id(),
+                                 addr,
+                                 available);
+                                 
+  monc->send_mon_message(m);
+  // TODO configure period
+  timer.add_event_after(5, new C_StdFunction(
+        [this](){
+          send_beacon();
+        }
+  )); 
+}
+
+void MgrStandby::handle_signal(int signum)
+{
+  Mutex::Locker l(lock);
+  assert(signum == SIGINT || signum == SIGTERM);
+  shutdown();
+}
+
+void MgrStandby::shutdown()
+{
+  // Expect already to be locked as we're called from signal handler
+  assert(lock.is_locked_by_me());
+
+  if (active_mgr) {
+    active_mgr->shutdown();
+  }
+
+  timer.shutdown();
+
+  monc->shutdown();
+  client_messenger->shutdown();
+}
+
+bool MgrStandby::ms_dispatch(Message *m)
+{
+  Mutex::Locker l(lock);
+  dout(4) << state_str() << " " << *m << dendl;
+
+  switch (m->get_type()) {
+    case MSG_MGR_MAP:
+      {
+        auto mmap = static_cast<MMgrMap*>(m);
+        auto map = mmap->get_map();
+        dout(4) << "received map epoch " << map.get_epoch() << dendl;
+        const bool active_in_map = map.active_gid == monc->get_global_id();
+        dout(4) << "active in map: " << active_in_map
+                << " active is " << map.active_gid << dendl;
+        if (active_in_map) {
+          if (active_mgr == nullptr) {
+            dout(1) << "Activating!" << dendl;
+            active_mgr = new Mgr(monc, client_messenger);
+            active_mgr->background_init();
+            dout(1) << "I am now active" << dendl;
+          } else {
+            dout(10) << "I was already active" << dendl;
+          }
+        } else {
+          if (active_mgr != nullptr) {
+            derr << "I was active but no longer am" << dendl;
+            shutdown();
+            // FIXME: should politely go back to standby (or respawn
+            // process) instead of stopping entirely.
+          }
+        }
+      }
+      break;
+
+    default:
+      if (active_mgr) {
+        return active_mgr->ms_dispatch(m);
+      } else {
+        return false;
+      }
+  }
+  return true;
+}
+
+
+bool MgrStandby::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+                         bool force_new)
+{
+  if (dest_type == CEPH_ENTITY_TYPE_MON)
+    return true;
+
+  if (force_new) {
+    if (monc->wait_auth_rotating(10) < 0)
+      return false;
+  }
+
+  *authorizer = monc->auth->build_authorizer(dest_type);
+  return *authorizer != NULL;
+}
+
+// A reference for use by the signal handler
+MgrStandby *signal_mgr = nullptr;
+
+static void handle_mgr_signal(int signum)
+{
+  if (signal_mgr) {
+    signal_mgr->handle_signal(signum);
+  }
+}
+
+int MgrStandby::main(vector<const char *> args)
+{
+  // Enable signal handlers
+  signal_mgr = this;
+  init_async_signal_handler();
+  register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
+  register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
+
+  client_messenger->wait();
+
+  // Disable signal handlers
+  unregister_async_signal_handler(SIGINT, handle_mgr_signal);
+  unregister_async_signal_handler(SIGTERM, handle_mgr_signal);
+  shutdown_async_signal_handler();
+  signal_mgr = nullptr;
+
+  return 0;
+}
+
+
+std::string MgrStandby::state_str()
+{
+  return active_mgr == nullptr ? "standby" : "active";
+}
+
diff --git a/src/mgr/MgrStandby.h b/src/mgr/MgrStandby.h
new file mode 100644 (file)
index 0000000..ccb44d7
--- /dev/null
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#ifndef MGR_STANDBY_H_
+#define MGR_STANDBY_H_
+
+#include "auth/Auth.h"
+#include "common/Finisher.h"
+#include "common/Timer.h"
+
+#include "DaemonServer.h"
+#include "PyModules.h"
+
+#include "DaemonState.h"
+#include "ClusterState.h"
+
+class Mgr;
+
+class MgrStandby : public Dispatcher {
+protected:
+  MonClient *monc;
+  Messenger *client_messenger;
+
+  Mutex lock;
+  SafeTimer timer;
+
+  Mgr *active_mgr;
+
+  std::string state_str();
+
+public:
+  MgrStandby();
+  ~MgrStandby();
+
+  bool ms_dispatch(Message *m);
+  bool ms_handle_reset(Connection *con) { return false; }
+  void ms_handle_remote_reset(Connection *con) {}
+  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+                         bool force_new);
+  int init();
+  void shutdown();
+  void usage() {}
+  int main(vector<const char *> args);
+  void handle_signal(int signum);
+  void send_beacon();
+};
+
+#endif
+
index 40bc68f9cee6cb04ce391d4eee709ad976d44090..e20940c54a81597e55f56387fecbc86a5d4a9f11 100644 (file)
@@ -312,12 +312,12 @@ public:
 
 void PyModules::start()
 {
-  {
-    Mutex::Locker l(lock);
-    for (auto &i : modules) {
-      auto thread = new ServeThread(i.second);
-      serve_threads[i.first] = thread;
-    }
+  Mutex::Locker l(lock);
+
+  dout(1) << "Creating threads for " << modules.size() << " modules" << dendl;
+  for (auto &i : modules) {
+    auto thread = new ServeThread(i.second);
+    serve_threads[i.first] = thread;
   }
 
   for (auto &i : serve_threads) {
index 9606de5cd546a466960b9fd2def90adaaecfe74b..909f40aa0df477a3886305fd03088493fdbce7ba 100644 (file)
 #ifndef MGR_MAP_H_
 #define MGR_MAP_H_
 
+#include <sstream>
+
 #include "msg/msg_types.h"
+#include "common/Formatter.h"
 #include "include/encoding.h"
 
+class StandbyInfo
+{
+public:
+  uint64_t gid;
+  std::string name;
+
+  StandbyInfo(uint64_t gid_, const std::string &name_)
+    : gid(gid_), name(name_)
+  {}
+
+  StandbyInfo()
+    : gid(0)
+  {}
+
+  void encode(bufferlist& bl) const
+  {
+    ENCODE_START(1, 1, bl);
+    ::encode(gid, bl);
+    ::encode(name, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& p)
+  {
+    DECODE_START(1, p);
+    ::decode(gid, p);
+    ::decode(name, p);
+    DECODE_FINISH(p);
+  }
+};
+WRITE_CLASS_ENCODER(StandbyInfo)
+
 class MgrMap
 {
 public:
+  epoch_t epoch;
+
+  // global_id of the ceph-mgr instance selected as a leader
   uint64_t active_gid;
+  // server address reported by the leader once it is active
   entity_addr_t active_addr;
-  epoch_t epoch;
+  // whether the nominated leader is active (i.e. has initialized its server)
+  bool available;
+  // the name (foo in mgr.<foo>) of the active daemon
+  std::string active_name;
+
+  std::map<uint64_t, StandbyInfo> standbys;
 
   epoch_t get_epoch() const { return epoch; }
   entity_addr_t get_active_addr() const { return active_addr; }
   uint64_t get_active_gid() const { return active_gid; }
+  bool get_available() const { return available; }
+  const std::string &get_active_name() const { return active_name; }
 
   void encode(bufferlist& bl, uint64_t features) const
   {
@@ -34,6 +80,9 @@ public:
     ::encode(epoch, bl);
     ::encode(active_addr, bl, features);
     ::encode(active_gid, bl);
+    ::encode(available, bl);
+    ::encode(active_name, bl);
+    ::encode(standbys, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -43,11 +92,55 @@ public:
     ::decode(epoch, p);
     ::decode(active_addr, p);
     ::decode(active_gid, p);
+    ::decode(available, p);
+    ::decode(active_name, p);
+    ::decode(standbys, p);
     DECODE_FINISH(p);
   }
 
+  void print_summary(Formatter *f, std::ostream *ss) const
+  {
+    // One or the other, not both
+    assert((ss != nullptr) != (f != nullptr));
+
+    if (f) {
+      f->dump_int("active_gid", get_active_gid());
+      f->dump_string("active_name", get_active_name());
+    } else {
+      if (get_active_gid() != 0) {
+        *ss << "active: " << get_active_name() << " ";
+      } else {
+        *ss << "no daemons active ";
+      }
+    }
+
+
+    if (f) {
+      f->open_array_section("standbys");
+      for (const auto &i : standbys) {
+        f->open_object_section("standby");
+        f->dump_int("gid", i.second.gid);
+        f->dump_string("name", i.second.name);
+        f->close_section();
+      }
+      f->close_section();
+    } else {
+      if (standbys.size()) {
+        *ss << "standbys: ";
+        bool first = true;
+        for (const auto &i : standbys) {
+          if (!first) {
+            *ss << ", ";
+          }
+          *ss << i.second.name;
+          first = false;
+        }
+      }
+    }
+  }
+
   MgrMap()
-    : epoch(0)
+    : epoch(0), available(false)
   {}
 };
 
index 66e2dcbacaeae75845a040a8d66d66b7ffeb72b3..4e34edfdce99659d626506917cc65f40d99490b4 100644 (file)
@@ -18,6 +18,7 @@
 #include "PGMap.h"
 #include "PGMonitor.h"
 #include "include/stringify.h"
+#include "mgr/MgrContext.h"
 
 #include "MgrMonitor.h"
 
@@ -71,6 +72,8 @@ bool MgrMonitor::preprocess_query(MonOpRequestRef op)
   switch (m->get_type()) {
     case MSG_MGR_BEACON:
       return preprocess_beacon(op);
+    case MSG_MON_COMMAND:
+      return preprocess_command(op);
     default:
       mon->no_reply(op);
       derr << "Unhandled message type " << m->get_type() << dendl;
@@ -84,6 +87,10 @@ bool MgrMonitor::prepare_update(MonOpRequestRef op)
   switch (m->get_type()) {
     case MSG_MGR_BEACON:
       return prepare_beacon(op);
+
+    case MSG_MON_COMMAND:
+      return prepare_command(op);
+
     default:
       mon->no_reply(op);
       derr << "Unhandled message type " << m->get_type() << dendl;
@@ -118,12 +125,70 @@ public:
 bool MgrMonitor::prepare_beacon(MonOpRequestRef op)
 {
   MMgrBeacon *m = static_cast<MMgrBeacon*>(op->get_req());
+  dout(4) << "beacon from " << m->get_gid() << dendl;
+
+  // See if we are seeing same name, new GID for the active daemon
+  if (m->get_name() == pending_map.active_name
+      && m->get_gid() != pending_map.active_gid)
+  {
+    dout(4) << "Active daemon restart (mgr." << m->get_name() << ")" << dendl;
+    drop_active();
+  }
+
+  // See if we are seeing same name, new GID for any standbys
+  for (const auto &i : pending_map.standbys) {
+    const StandbyInfo &s = i.second;
+    if (s.name == m->get_name() && s.gid != m->get_gid()) {
+      dout(4) << "Standby daemon restart (mgr." << m->get_name() << ")" << dendl;
+      drop_standby(i.first);
+      break;
+    }
+  }
+
+  last_beacon[m->get_gid()] = ceph_clock_now(g_ceph_context);
+
+  // Track whether we modified pending_map
+  bool updated = false;
+
+  if (pending_map.active_gid == m->get_gid()) {
+    // A beacon from the currently active daemon
+    if (pending_map.active_addr != m->get_server_addr()) {
+      dout(4) << "learned address " << m->get_server_addr() << dendl;
+      pending_map.active_addr = m->get_server_addr();
+      updated = true;
+    }
+
+    if (pending_map.get_available() != m->get_available()) {
+      dout(4) << "available " << m->get_gid() << dendl;
+      pending_map.available = m->get_available();
+      updated = true;
+    }
+  } else if (pending_map.active_gid == 0) {
+    // There is no currently active daemon, select this one.
+    if (pending_map.standbys.count(m->get_gid())) {
+      drop_standby(m->get_gid());
+    }
+    pending_map.active_gid = m->get_gid();
+    pending_map.active_name = m->get_name();
 
-  pending_map.active_gid = m->get_gid();
-  pending_map.active_addr = m->get_server_addr();
+    dout(4) << "selecting new active in epoch " << pending_map.epoch << dendl;
+    wait_for_finished_proposal(op, new C_Updated(this, op));
+  } else {
+    if (pending_map.standbys.count(m->get_gid()) > 0) {
+      dout(10) << "from existing standby " << m->get_gid() << dendl;
+    } else {
+      dout(10) << "new standby " << m->get_gid() << dendl;
+      pending_map.standbys[m->get_gid()] = {m->get_gid(), m->get_name()};
+      updated = true;
+    }
+  }
 
-  dout(4) << "proposing epoch " << pending_map.epoch << dendl;
-  wait_for_finished_proposal(op, new C_Updated(this, op));
+  if (updated) {
+    dout(4) << "updating map" << dendl;
+    wait_for_finished_proposal(op, new C_Updated(this, op));
+  } else {
+    dout(10) << "no change" << dendl;
+  }
 
   return true;
 }
@@ -162,6 +227,8 @@ void MgrMonitor::check_sub(Subscription *sub)
  */
 void MgrMonitor::send_digests()
 {
+  digest_callback = nullptr;
+
   const std::string type = "mgrdigest";
   if (mon->session_map.subs.count(type) == 0)
     return;
@@ -182,11 +249,214 @@ void MgrMonitor::send_digests()
 
     sub->session->con->send_message(mdigest);
   }
+
+  digest_callback = new C_StdFunction([this](){
+      send_digests();
+  });
+  mon->timer.add_event_after(5, digest_callback);
 }
 
 void MgrMonitor::tick()
 {
-  // TODO control frequency independently of the global tick frequency
-  send_digests();
+  const utime_t now = ceph_clock_now(g_ceph_context);
+  utime_t cutoff = now;
+  cutoff -= g_conf->mon_mgr_beacon_grace;
+
+  // Populate any missing beacons (i.e. no beacon since MgrMonitor
+  // instantiation) with the current time, so that they will
+  // eventually look laggy if they fail to give us a beacon.
+  if (pending_map.active_gid != 0
+      && last_beacon.count(pending_map.active_gid) == 0) {
+    last_beacon[pending_map.active_gid] = now;
+  }
+  for (auto s : pending_map.standbys) {
+    if (last_beacon.count(s.first) == 0) {
+      last_beacon[s.first] = now;
+    }
+  }
+
+  // Cull standbys first so that any remaining standbys
+  // will be eligible to take over from the active if we cull him.
+  std::list<uint64_t> dead_standbys;
+  for (const auto &i : pending_map.standbys) {
+    auto last_beacon_time = last_beacon.at(i.first);
+    if (last_beacon_time < cutoff) {
+      dead_standbys.push_back(i.first);
+    }
+  }
+
+  bool propose = false;
+
+  for (auto i : dead_standbys) {
+    dout(4) << "Dropping laggy standby " << i << dendl;
+    drop_standby(i);
+    propose = true;
+  }
+
+  if (pending_map.active_gid != 0
+      && last_beacon.at(pending_map.active_gid) < cutoff) {
+
+    drop_active();
+    dout(4) << "Dropping active" << pending_map.active_gid << dendl;
+    if (promote_standby()) {
+      dout(4) << "Promoted standby " << pending_map.active_gid << dendl;
+      propose = true;
+    } else {
+      dout(4) << "Active is laggy but have no standbys to replace it" << dendl;
+    }
+  } else if (pending_map.active_gid == 0) {
+    if (promote_standby()) {
+      dout(4) << "Promoted standby " << pending_map.active_gid << dendl;
+      propose = true;
+    }
+  }
+
+  if (propose) {
+    propose_pending();
+  }
+}
+
+bool MgrMonitor::promote_standby()
+{
+  assert(pending_map.active_gid == 0);
+  if (pending_map.standbys.size()) {
+    // Promote a replacement (arbitrary choice of standby)
+    auto replacement_gid = pending_map.standbys.begin()->first;
+    pending_map.active_gid = replacement_gid;
+    pending_map.active_name = pending_map.standbys.at(replacement_gid).name;
+    pending_map.available = false;
+    pending_map.active_addr = entity_addr_t();
+
+    drop_standby(replacement_gid);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void MgrMonitor::drop_active()
+{
+  if (last_beacon.count(pending_map.active_gid) > 0) {
+    last_beacon.erase(pending_map.active_gid);
+  }
+
+  pending_map.active_name = "";
+  pending_map.active_gid = 0;
+  pending_map.available = false;
+  pending_map.active_addr = entity_addr_t();
+}
+
+void MgrMonitor::drop_standby(uint64_t gid)
+{
+  pending_map.standbys.erase(gid);
+  if (last_beacon.count(gid) > 0) {
+    last_beacon.erase(gid);
+  }
+
+}
+
+bool MgrMonitor::preprocess_command(MonOpRequestRef op)
+{
+  return false;
+
+}
+
+bool MgrMonitor::prepare_command(MonOpRequestRef op)
+{
+  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+
+  std::stringstream ss;
+  bufferlist rdata;
+
+  std::map<std::string, cmd_vartype> cmdmap;
+  if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
+    string rs = ss.str();
+    mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
+    return true;
+  }
+
+  MonSession *session = m->get_session();
+  if (!session) {
+    mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
+    return true;
+  }
+
+  string prefix;
+  cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+
+  int r = 0;
+
+  if (prefix == "mgr fail") {
+    string who;
+    cmd_getval(g_ceph_context, cmdmap, "who", who);
+
+    std::string err;
+    uint64_t gid = strict_strtol(who.c_str(), 10, &err);
+    bool changed = false;
+    if (!err.empty()) {
+      // Does not parse as a gid, treat it as a name
+      if (pending_map.active_name == who) {
+        drop_active();
+        changed = true;
+      } else {
+        gid = 0;
+        for (const auto &i : pending_map.standbys) {
+          if (i.second.name == who) {
+            gid = i.first;
+          }
+        }
+        if (gid != 0) {
+          drop_standby(gid);
+          changed = true;
+        } else {
+          ss << "Daemon not found '" << who << "', already failed?";
+        }
+      }
+    } else {
+      if (pending_map.active_gid == gid) {
+        drop_active();
+        changed = true;
+      } else if (pending_map.standbys.count(gid) > 0) {
+        drop_standby(gid);
+        changed = true;
+      } else {
+        ss << "Daemon not found '" << gid << "', already failed?";
+      }
+    }
+
+    if (changed) {
+      tick();
+    }
+  } else {
+    r = -ENOSYS;
+  }
+
+  dout(4) << __func__ << " done, r=" << r << dendl;
+  /* Compose response */
+  string rs;
+  getline(ss, rs);
+
+  if (r >= 0) {
+    // success.. delay reply
+    wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, r, rs,
+                                             get_last_committed() + 1));
+    return true;
+  } else {
+    // reply immediately
+    mon->reply_command(op, r, rs, rdata, get_last_committed());
+    return false;
+  }
+}
+
+void MgrMonitor::init()
+{
+  send_digests();  // To get it to schedule its own event
+}
+
+void MgrMonitor::on_shutdown()
+{
+  if (digest_callback) {
+    mon->timer.cancel_event(digest_callback);
+  }
 }
 
index 51850893ed1c3c23d528afdfa0aa87fe2f1c4081..bd4ac4599bf482d6fddbba068a091e4913b7e585 100644 (file)
  */
 
 
+#include "include/Context.h"
 #include "MgrMap.h"
 #include "PaxosService.h"
 
+
 class MgrMonitor : public PaxosService
 {
   MgrMap map;
   MgrMap pending_map;
 
+  std::map<uint64_t, utime_t> last_beacon;
+
+  /**
+   * If a standby is available, make it active, given that
+   * there is currently no active daemon.
+   *
+   * @return true if a standby was promoted
+   */
+  bool promote_standby();
+  void drop_active();
+  void drop_standby(uint64_t gid);
+
+  Context *digest_callback;
+
 public:
   MgrMonitor(Monitor *mn, Paxos *p, const string& service_name)
-    : PaxosService(mn, p, service_name)
+    : PaxosService(mn, p, service_name), digest_callback(nullptr)
   {}
 
+  void init();
+  void on_shutdown();
+
+  const MgrMap &get_map() const { return map; }
+
+  bool in_use() const { return map.epoch > 0; }
+
   void create_initial();
   void update_from_paxos(bool *need_bootstrap);
   void create_pending();
   void encode_pending(MonitorDBStore::TransactionRef t);
+
   bool preprocess_query(MonOpRequestRef op);
   bool prepare_update(MonOpRequestRef op);
+
+  bool preprocess_command(MonOpRequestRef op);
+  bool prepare_command(MonOpRequestRef op);
+
   void encode_full(MonitorDBStore::TransactionRef t) { }
 
   bool preprocess_beacon(MonOpRequestRef op);
@@ -42,6 +70,8 @@ public:
 
   void tick();
 
+  void print_summary(Formatter *f, std::ostream *ss) const;
+
   friend class C_Updated;
 };
 
index 5dc1eed785c86d37d44359b37a9266bb30a61084..035fe5d4f43053c9435ae09914ceb36097f0bf47 100644 (file)
@@ -842,3 +842,10 @@ COMMAND("config-key exists " \
        "name=key,type=CephString", \
        "check for <key>'s existence", "config-key", "r", "cli,rest")
 COMMAND("config-key list ", "list keys", "config-key", "r", "cli,rest")
+
+
+/*
+ * mon/MgrMonitor.cc
+ */
+COMMAND("mgr fail name=who,type=CephString", \
+       "treat the named manager daemon as failed", "mgr", "rw", "cli,rest")
index 9ff47ce86ed533b8b2e0f04e8f7fda218e74210a..67cf14304a8751fe98ff95827eb6556836234b0e 100644 (file)
@@ -2466,6 +2466,10 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     f->open_object_section("fsmap");
     mdsmon()->fsmap.print_summary(f, NULL);
     f->close_section();
+
+    f->open_object_section("mgrmap");
+    mgrmon()->get_map().print_summary(f, nullptr);
+    f->close_section();
     f->close_section();
   } else {
     ss << "    cluster " << monmap->get_fsid() << "\n";
@@ -2477,6 +2481,11 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     if (mdsmon()->fsmap.any_filesystems()) {
       ss << "      fsmap " << mdsmon()->fsmap << "\n";
     }
+    if (mgrmon()->in_use()) {
+      ss << "        mgr ";
+      mgrmon()->get_map().print_summary(nullptr, &ss);
+      ss << "\n";
+    }
 
     osdmon()->osdmap.print_summary(NULL, ss);
     pgmon()->pg_map.print_summary(NULL, &ss);
@@ -2801,6 +2810,11 @@ void Monitor::handle_command(MonOpRequestRef op)
     return;
   }
 
+  if (module == "mgr") {
+    mgrmon()->dispatch(op);
+    return;
+  }
+
   if (prefix == "fsid") {
     if (f) {
       f->open_object_section("fsid");