]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/mgr: add mgr client
authorKefu Chai <kchai@redhat.com>
Mon, 4 Mar 2019 09:13:29 +0000 (17:13 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 20 Mar 2019 09:34:00 +0000 (17:34 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/CMakeLists.txt
src/crimson/mgr/client.cc [new file with mode: 0644]
src/crimson/mgr/client.h [new file with mode: 0644]

index 05d6f148f39549e2b97be191541514fbf1a0d612..20c545dad178408f7b9a587ddfad85aae435d59b 100644 (file)
@@ -112,6 +112,8 @@ target_link_libraries(crimson-common
 
 set(crimson_auth_srcs
   auth/KeyRing.cc)
+set(crimson_mgr_srcs
+  mgr/client.cc)
 set(crimson_mon_srcs
   mon/MonClient.cc
   ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc)
@@ -126,6 +128,7 @@ set(crimson_thread_srcs
   thread/Throttle.cc)
 add_library(crimson STATIC
   ${crimson_auth_srcs}
+  ${crimson_mgr_srcs}
   ${crimson_mon_srcs}
   ${crimson_net_srcs}
   ${crimson_thread_srcs}
diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc
new file mode 100644 (file)
index 0000000..bdce3f1
--- /dev/null
@@ -0,0 +1,130 @@
+#include "client.h"
+
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+#include "messages/MMgrConfigure.h"
+#include "messages/MMgrMap.h"
+#include "messages/MMgrOpen.h"
+
+namespace {
+  seastar::logger& logger()
+  {
+    return ceph::get_logger(ceph_subsys_mgrc);
+  }
+  template<typename Message, typename... Args>
+  Ref<Message> make_message(Args&&... args)
+  {
+    // Message inherits from RefCountedObject, whose nref is 1 when it is
+    // constructed, so we pass "add_ref = false" to intrusive_ptr's ctor
+    return {new Message{std::forward<Args>(args)...}, false};
+  }
+}
+
+using ceph::common::local_conf;
+
+namespace ceph::mgr
+{
+
+Client::Client(ceph::net::Messenger& msgr,
+                 WithStats& with_stats)
+  : msgr{msgr},
+    with_stats{with_stats},
+    report_timer{[this] {report();}}
+{}
+
+seastar::future<> Client::start()
+{
+  return seastar::now();
+}
+
+seastar::future<> Client::stop()
+{
+  return gate.close().then([this] {
+    if (conn) {
+      return conn->close();
+    } else {
+      return seastar::now();
+    }
+  });
+}
+
+seastar::future<> Client::ms_dispatch(ceph::net::ConnectionRef conn,
+                                      MessageRef m)
+{
+  switch(m->get_type()) {
+  case MSG_MGR_MAP:
+    return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
+  case MSG_MGR_CONFIGURE:
+    return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
+  default:
+    return seastar::now();
+  }
+}
+
+seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef c)
+{
+  if (conn == c) {
+    return reconnect();
+  } else {
+    return seastar::now();
+  }
+}
+
+seastar::future<> Client::reconnect()
+{
+  return (conn ? conn->close() : seastar::now()).then([this] {
+    if (!mgrmap.get_available()) {
+      logger().warn("No active mgr available yet");
+      return seastar::now();
+    }
+    auto peer = mgrmap.get_active_addrs().legacy_addr();
+    return msgr.connect(peer, CEPH_ENTITY_TYPE_MGR).then(
+      [this](auto xconn) {
+        conn = xconn->release();
+        // ask for the mgrconfigure message
+        auto m = make_message<MMgrOpen>();
+        m->daemon_name = local_conf()->name.get_id();
+        return conn->send(std::move(m));
+      });
+  });
+}
+
+seastar::future<> Client::handle_mgr_map(ceph::net::ConnectionRef,
+                                         Ref<MMgrMap> m)
+{
+  mgrmap = m->get_map();
+  if (!conn) {
+    return reconnect();
+  } else if (conn->get_peer_addr() !=
+             mgrmap.get_active_addrs().legacy_addr()) {
+    return reconnect();
+  } else {
+    return seastar::now();
+  }
+}
+
+seastar::future<> Client::handle_mgr_conf(ceph::net::ConnectionRef conn,
+                                          Ref<MMgrConfigure> m)
+{
+  logger().info("{} {}", __func__, *m);
+  report_period = std::chrono::seconds{m->stats_period};
+  if (report_period.count() && !report_timer.armed() ) {
+    report();
+  }
+  return seastar::now();
+}
+
+void Client::report()
+{
+  seastar::with_gate(gate, [this] {
+    auto pg_stats = with_stats.get_stats();
+    return conn->send(std::move(pg_stats)).finally([this] {
+      if (report_period.count()) {
+        report_timer.arm(report_period);
+      }
+    });
+  });
+}
+
+}
diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h
new file mode 100644 (file)
index 0000000..016044b
--- /dev/null
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/timer.hh>
+
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Fwd.h"
+#include "mon/MgrMap.h"
+
+template<typename Message> using Ref = boost::intrusive_ptr<Message>;
+namespace ceph::net {
+  class Messenger;
+}
+
+class MMgrMap;
+class MMgrConfigure;
+
+namespace ceph::mgr
+{
+
+// implement WithStats if you want to report stats to mgr periodically
+class WithStats {
+public:
+  // the method is not const, because the class sending stats might need to
+  // update a seq number every time it collects the stats
+  virtual MessageRef get_stats() = 0;
+  virtual ~WithStats() {}
+};
+
+class Client : public ceph::net::Dispatcher {
+public:
+  Client(ceph::net::Messenger& msgr,
+        WithStats& with_stats);
+  seastar::future<> start();
+  seastar::future<> stop();
+private:
+  seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+                               Ref<Message> m) override;
+  seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
+  seastar::future<> handle_mgr_map(ceph::net::ConnectionRef conn,
+                                  Ref<MMgrMap> m);
+  seastar::future<> handle_mgr_conf(ceph::net::ConnectionRef conn,
+                                   Ref<MMgrConfigure> m);
+  seastar::future<> reconnect();
+  void report();
+
+private:
+  MgrMap mgrmap;
+  ceph::net::Messenger& msgr;
+  WithStats& with_stats;
+  ceph::net::ConnectionRef conn;
+  std::chrono::seconds report_period{0};
+  seastar::timer<seastar::lowres_clock> report_timer;
+  seastar::gate gate;
+};
+
+}