From 2207f4151ffd2eeac7df590fdfad26baeea4a52e Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 4 Mar 2019 17:13:29 +0800 Subject: [PATCH] crimson/mgr: add mgr client Signed-off-by: Kefu Chai --- src/crimson/CMakeLists.txt | 3 + src/crimson/mgr/client.cc | 130 +++++++++++++++++++++++++++++++++++++ src/crimson/mgr/client.h | 60 +++++++++++++++++ 3 files changed, 193 insertions(+) create mode 100644 src/crimson/mgr/client.cc create mode 100644 src/crimson/mgr/client.h diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 05d6f148f39..20c545dad17 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -112,6 +112,8 @@ target_link_libraries(crimson-common set(crimson_auth_srcs auth/KeyRing.cc) +set(crimson_mgr_srcs + mgr/client.cc) set(crimson_mon_srcs mon/MonClient.cc ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc) @@ -126,6 +128,7 @@ set(crimson_thread_srcs thread/Throttle.cc) add_library(crimson STATIC ${crimson_auth_srcs} + ${crimson_mgr_srcs} ${crimson_mon_srcs} ${crimson_net_srcs} ${crimson_thread_srcs} diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc new file mode 100644 index 00000000000..bdce3f14fa7 --- /dev/null +++ b/src/crimson/mgr/client.cc @@ -0,0 +1,130 @@ +#include "client.h" + +#include "crimson/common/log.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "messages/MMgrConfigure.h" +#include "messages/MMgrMap.h" +#include "messages/MMgrOpen.h" + +namespace { + seastar::logger& logger() + { + return ceph::get_logger(ceph_subsys_mgrc); + } + template + Ref make_message(Args&&... args) + { + // Message inherits from RefCountedObject, whose nref is 1 when it is + // constructed, so we pass "add_ref = false" to intrusive_ptr's ctor + return {new Message{std::forward(args)...}, false}; + } +} + +using ceph::common::local_conf; + +namespace ceph::mgr +{ + +Client::Client(ceph::net::Messenger& msgr, + WithStats& with_stats) + : msgr{msgr}, + with_stats{with_stats}, + report_timer{[this] {report();}} +{} + +seastar::future<> Client::start() +{ + return seastar::now(); +} + +seastar::future<> Client::stop() +{ + return gate.close().then([this] { + if (conn) { + return conn->close(); + } else { + return seastar::now(); + } + }); +} + +seastar::future<> Client::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) +{ + switch(m->get_type()) { + case MSG_MGR_MAP: + return handle_mgr_map(conn, boost::static_pointer_cast(m)); + case MSG_MGR_CONFIGURE: + return handle_mgr_conf(conn, boost::static_pointer_cast(m)); + default: + return seastar::now(); + } +} + +seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef c) +{ + if (conn == c) { + return reconnect(); + } else { + return seastar::now(); + } +} + +seastar::future<> Client::reconnect() +{ + return (conn ? conn->close() : seastar::now()).then([this] { + if (!mgrmap.get_available()) { + logger().warn("No active mgr available yet"); + return seastar::now(); + } + auto peer = mgrmap.get_active_addrs().legacy_addr(); + return msgr.connect(peer, CEPH_ENTITY_TYPE_MGR).then( + [this](auto xconn) { + conn = xconn->release(); + // ask for the mgrconfigure message + auto m = make_message(); + m->daemon_name = local_conf()->name.get_id(); + return conn->send(std::move(m)); + }); + }); +} + +seastar::future<> Client::handle_mgr_map(ceph::net::ConnectionRef, + Ref m) +{ + mgrmap = m->get_map(); + if (!conn) { + return reconnect(); + } else if (conn->get_peer_addr() != + mgrmap.get_active_addrs().legacy_addr()) { + return reconnect(); + } else { + return seastar::now(); + } +} + +seastar::future<> Client::handle_mgr_conf(ceph::net::ConnectionRef conn, + Ref m) +{ + logger().info("{} {}", __func__, *m); + report_period = std::chrono::seconds{m->stats_period}; + if (report_period.count() && !report_timer.armed() ) { + report(); + } + return seastar::now(); +} + +void Client::report() +{ + seastar::with_gate(gate, [this] { + auto pg_stats = with_stats.get_stats(); + return conn->send(std::move(pg_stats)).finally([this] { + if (report_period.count()) { + report_timer.arm(report_period); + } + }); + }); +} + +} diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h new file mode 100644 index 00000000000..016044bebbb --- /dev/null +++ b/src/crimson/mgr/client.h @@ -0,0 +1,60 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" +#include "mon/MgrMap.h" + +template using Ref = boost::intrusive_ptr; +namespace ceph::net { + class Messenger; +} + +class MMgrMap; +class MMgrConfigure; + +namespace ceph::mgr +{ + +// implement WithStats if you want to report stats to mgr periodically +class WithStats { +public: + // the method is not const, because the class sending stats might need to + // update a seq number every time it collects the stats + virtual MessageRef get_stats() = 0; + virtual ~WithStats() {} +}; + +class Client : public ceph::net::Dispatcher { +public: + Client(ceph::net::Messenger& msgr, + WithStats& with_stats); + seastar::future<> start(); + seastar::future<> stop(); +private: + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + Ref m) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> handle_mgr_map(ceph::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_mgr_conf(ceph::net::ConnectionRef conn, + Ref m); + seastar::future<> reconnect(); + void report(); + +private: + MgrMap mgrmap; + ceph::net::Messenger& msgr; + WithStats& with_stats; + ceph::net::ConnectionRef conn; + std::chrono::seconds report_period{0}; + seastar::timer report_timer; + seastar::gate gate; +}; + +} -- 2.39.5