]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgrc: create MgrClient
authorJohn Spray <john.spray@redhat.com>
Thu, 30 Jun 2016 13:05:47 +0000 (14:05 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 29 Sep 2016 16:26:54 +0000 (17:26 +0100)
The ceph-mgr equivalent to monclient

Signed-off-by: John Spray <john.spray@redhat.com>
src/mgr/MgrClient.cc [new file with mode: 0644]
src/mgr/MgrClient.h [new file with mode: 0644]

diff --git a/src/mgr/MgrClient.cc b/src/mgr/MgrClient.cc
new file mode 100644 (file)
index 0000000..2fbbc80
--- /dev/null
@@ -0,0 +1,318 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#include "MgrClient.h"
+
+#include "mgr/MgrContext.h"
+
+#include "msg/Messenger.h"
+#include "messages/MMgrMap.h"
+#include "messages/MMgrReport.h"
+#include "messages/MMgrOpen.h"
+#include "messages/MMgrConfigure.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+
+#define dout_subsys ceph_subsys_mgrc
+#undef dout_prefix
+#define dout_prefix *_dout << "mgrc " << __func__ << " "
+
+MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
+    : Dispatcher(cct_), cct(cct_), msgr(msgr_),
+      session(nullptr),
+      lock("mgrc"),
+      timer(cct_, lock)
+{
+  assert(cct != nullptr);
+}
+
+void MgrClient::init()
+{
+  Mutex::Locker l(lock);
+
+  assert(msgr != nullptr);
+
+  timer.init();
+
+#if 0
+  if (map.epoch == 0) {
+    ldout(cct, 4) << "no map yet, waiting..." << dendl;
+    wait_on_list(waiting_for_map);
+  }
+  ldout(cct, 4) << "proceeding with map " << map.epoch << dendl;
+#endif
+}
+
+void MgrClient::shutdown()
+{
+  Mutex::Locker l(lock);
+
+  timer.shutdown();
+}
+
+bool MgrClient::ms_dispatch(Message *m)
+{
+  Mutex::Locker l(lock);
+
+  ldout(cct, 20) << *m << dendl;
+  switch(m->get_type()) {
+  case MSG_MGR_MAP:
+    return handle_mgr_map(static_cast<MMgrMap*>(m));
+  case MSG_MGR_CONFIGURE:
+    return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
+  case MSG_COMMAND_REPLY:
+    if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
+      handle_command_reply(static_cast<MCommandReply*>(m));
+      return true;
+    } else {
+      return false;
+    }
+  default:
+    ldout(cct, 10) << "Not handling " << *m << dendl; 
+    return false;
+  }
+}
+
+bool MgrClient::handle_mgr_map(MMgrMap *m)
+{
+  assert(lock.is_locked_by_me());
+
+  map = m->get_map();
+  ldout(cct, 4) << "Got map version " << map.epoch << dendl;
+  m->put();
+
+  ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl;
+
+  // Reset session?
+  if (session == nullptr || 
+      session->con->get_peer_addr() != map.get_active_addr()) {
+
+    entity_inst_t inst;
+    inst.addr = map.get_active_addr();
+    inst.name = entity_name_t::MGR(map.get_active_gid());
+
+    delete session;
+    session = new MgrSessionState();
+    session->con = msgr->get_connection(inst);
+
+    // Don't send an open if we're just a client (i.e. doing
+    // command-sending, not stats etc)
+    if (g_conf && !g_conf->name.is_client()) {
+      auto open = new MMgrOpen();
+      open->daemon_name = g_conf->name.get_id();
+      session->con->send_message(open);
+    }
+
+    std::vector<ceph_tid_t> erase_cmds;
+    auto commands = command_table.get_commands();
+    for (const auto &i : commands) {
+      // FIXME be nicer, retarget command on new mgr?
+      if (i.second->on_finish != nullptr) {
+        i.second->on_finish->complete(-ETIMEDOUT);
+      }
+      erase_cmds.push_back(i.first);
+    }
+    for (const auto &tid : erase_cmds) {
+      command_table.erase(tid);
+    }
+  }
+
+  signal_cond_list(waiting_for_map);
+
+  return true;
+}
+
+bool MgrClient::ms_handle_reset(Connection *con)
+{
+#if 0
+  Mutex::Locker lock(monc_lock);
+
+  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+    if (cur_mon.empty() || con != cur_con) {
+      ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
+      return true;
+    } else {
+      ldout(cct, 10) << "ms_handle_reset current mon " << con->get_peer_addr() << dendl;
+      if (hunting)
+       return true;
+      
+      ldout(cct, 0) << "hunting for new mon" << dendl;
+      _reopen_session();
+    }
+  }
+  return false;
+#else
+  return true;
+#endif
+}
+
+
+void MgrClient::send_report()
+{
+  assert(lock.is_locked_by_me());
+  assert(session);
+
+  auto report = new MMgrReport();
+  auto pcc = cct->get_perfcounters_collection();
+
+  pcc->with_counters([this, report](
+        const PerfCountersCollection::CounterMap &by_path)
+  {
+    bool const declared_all = (session->declared.size() == by_path.size());
+
+    if (!declared_all) {
+      for (const auto &i : by_path) {
+        auto path = i.first;
+        auto data = *(i.second);
+        
+        if (session->declared.count(path) == 0) {
+          PerfCounterType type;
+          type.path = path;
+          if (data.description) {
+            type.description = data.description;
+          }
+          if (data.nick) {
+            type.nick = data.nick;
+          }
+          type.type = data.type;
+          report->declare_types.push_back(std::move(type));
+          session->declared.insert(path);
+        }
+      }
+    }
+
+    ldout(cct, 20) << by_path.size() << " counters, of which "
+             << report->declare_types.size() << " new" << dendl;
+
+    ENCODE_START(1, 1, report->packed);
+    for (const auto &path : session->declared) {
+      auto data = by_path.at(path);
+      ::encode(static_cast<uint64_t>(data->u64.read()),
+          report->packed);
+      if (data->type & PERFCOUNTER_LONGRUNAVG) {
+        ::encode(static_cast<uint64_t>(data->avgcount.read()),
+            report->packed);
+        ::encode(static_cast<uint64_t>(data->avgcount2.read()),
+            report->packed);
+      }
+    }
+    ENCODE_FINISH(report->packed);
+  });
+
+  ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
+
+  report->daemon_name = g_conf->name.get_id();
+
+  session->con->send_message(report);
+
+  if (stats_period != 0) {
+    auto c = new C_StdFunction([this](){send_report();});
+    timer.add_event_after(stats_period, c);
+  }
+}
+
+bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
+{
+  assert(lock.is_locked_by_me());
+
+  ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
+
+  bool starting = (stats_period == 0) && (m->stats_period != 0);
+  stats_period = m->stats_period;
+  if (starting) {
+    send_report();
+  }
+
+  m->put();
+  return true;
+}
+
+void MgrClient::wait_on_list(list<Cond*>& ls)
+{
+  assert(lock.is_locked_by_me());
+
+  Cond cond;
+  ls.push_back(&cond);
+  cond.Wait(lock);
+  ls.remove(&cond);
+}
+
+void MgrClient::signal_cond_list(list<Cond*>& ls)
+{
+  for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); ++it)
+    (*it)->Signal();
+}
+
+int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
+                  bufferlist *outbl, string *outs,
+                  Context *onfinish)
+{
+  Mutex::Locker l(lock);
+
+  ldout(cct, 20) << "cmd: " << cmd << dendl;
+
+  assert(map.epoch > 0);
+
+
+
+  if (session == nullptr) {
+    derr << "no session" << dendl;
+    // FIXME: be nicer: maybe block until a mgr is available?
+    return -ENOENT;
+  }
+
+  MgrCommand *op = command_table.start_command();
+  op->cmd = cmd;
+  op->inbl = inbl;
+  op->outbl = outbl;
+  op->outs = outs;
+  op->on_finish = onfinish;
+
+  // Leaving fsid argument null because it isn't used.
+  MCommand *m = op->get_message({});
+  session->con->send_message(m);
+
+  return 0;
+}
+
+bool MgrClient::handle_command_reply(MCommandReply *m)
+{
+  assert(lock.is_locked_by_me());
+
+  const auto tid = m->get_tid();
+  const auto op = command_table.get_command(tid);
+  if (op == nullptr) {
+    ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
+            << " not found" << dendl;
+    m->put();
+    return true;
+  }
+
+  if (op->outbl) {
+    op->outbl->claim(m->get_data());
+  }
+
+  if (op->outs) {
+    *(op->outs) = m->rs;
+  }
+
+  if (op->on_finish) {
+    op->on_finish->complete(m->r);
+  }
+
+  command_table.erase(tid);
+
+  m->put();
+  return true;
+}
diff --git a/src/mgr/MgrClient.h b/src/mgr/MgrClient.h
new file mode 100644 (file)
index 0000000..bcaf3de
--- /dev/null
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef MGR_CLIENT_H_
+#define MGR_CLIENT_H_
+
+#include "msg/Dispatcher.h"
+#include "mon/MgrMap.h"
+
+#include "msg/Connection.h"
+
+#include "common/perf_counters.h"
+#include "common/Timer.h"
+#include "common/CommandTable.h"
+
+class MMgrMap;
+class MMgrConfigure;
+class Messenger;
+class MCommandReply;
+
+class MgrSessionState
+{
+  public:
+  // Which performance counters have we already transmitted schema for?
+  std::set<std::string> declared;
+
+  // Our connection to the mgr
+  ConnectionRef con;
+};
+
+class MgrCommand : public CommandOp
+{
+  public:
+
+  MgrCommand(ceph_tid_t t) : CommandOp(t) {}
+};
+
+class MgrClient : public Dispatcher
+{
+protected:
+  CephContext *cct;
+  MgrMap map;
+  Messenger *msgr;
+
+  MgrSessionState *session;
+
+  Mutex lock;
+
+  uint32_t stats_period;
+  SafeTimer     timer;
+
+  CommandTable<MgrCommand> command_table;
+
+  void wait_on_list(list<Cond*>& ls);
+  void signal_cond_list(list<Cond*>& ls);
+  list<Cond*> waiting_for_map;
+
+public:
+  MgrClient(CephContext *cct_, Messenger *msgr_);
+
+  void set_messenger(Messenger *msgr_) { msgr = msgr_; }
+
+  void init();
+  void shutdown();
+
+  bool ms_dispatch(Message *m);
+  bool ms_handle_reset(Connection *con);
+  void ms_handle_remote_reset(Connection *con) {}
+
+  bool handle_mgr_map(MMgrMap *m);
+  bool handle_mgr_configure(MMgrConfigure *m);
+  bool handle_command_reply(MCommandReply *m);
+
+  void send_report();
+
+  int start_command(const vector<string>& cmd, const bufferlist& inbl,
+                   bufferlist *outbl, string *outs,
+                   Context *onfinish);
+};
+
+#endif
+