From: Jason Dillaman Date: Fri, 14 Jul 2017 18:28:04 +0000 (-0400) Subject: rbd-mirror: service daemon status formatter X-Git-Tag: v12.1.2~151^2~15 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=24ed9017cae1950efafaf7acab650e80969fb9e9;p=ceph.git rbd-mirror: service daemon status formatter Signed-off-by: Jason Dillaman --- diff --git a/src/test/rbd_mirror/test_ClusterWatcher.cc b/src/test/rbd_mirror/test_ClusterWatcher.cc index 61f67a5f2279..8aafbdb4fe32 100644 --- a/src/test/rbd_mirror/test_ClusterWatcher.cc +++ b/src/test/rbd_mirror/test_ClusterWatcher.cc @@ -35,10 +35,6 @@ public: { m_cluster = std::make_shared(); EXPECT_EQ("", connect_cluster_pp(*m_cluster)); - m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context, - m_cluster)); - m_cluster_watcher.reset(new ClusterWatcher(m_cluster, m_lock, - m_service_daemon.get())); } ~TestClusterWatcher() override { @@ -48,6 +44,21 @@ public: } } + void SetUp() override { + TestFixture::SetUp(); + m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context, + m_cluster, + m_threads)); + m_cluster_watcher.reset(new ClusterWatcher(m_cluster, m_lock, + m_service_daemon.get())); + } + + void TearDown() override { + m_service_daemon.reset(); + m_cluster_watcher.reset(); + TestFixture::TearDown(); + } + void create_pool(bool enable_mirroring, const peer_t &peer, string *uuid = nullptr, string *name=nullptr) { string pool_name = get_temp_pool_name("test-rbd-mirror-"); diff --git a/src/test/rbd_mirror/test_ImageDeleter.cc b/src/test/rbd_mirror/test_ImageDeleter.cc index 0f0b44fdb149..56f1dde032d5 100644 --- a/src/test/rbd_mirror/test_ImageDeleter.cc +++ b/src/test/rbd_mirror/test_ImageDeleter.cc @@ -63,7 +63,7 @@ public: void SetUp() override { TestFixture::SetUp(); m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context, - _rados)); + _rados, m_threads)); librbd::api::Mirror<>::mode_set(m_local_io_ctx, RBD_MIRROR_MODE_IMAGE); @@ -89,8 +89,10 @@ public: void TearDown() override { remove_image(); - TestFixture::TearDown(); delete m_deleter; + m_service_daemon.reset(); + + TestFixture::TearDown(); } void remove_image(bool force=false) { diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 810ebff41a7e..c94d2ccee3ee 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -113,11 +113,12 @@ public: false, features, &order, 0, 0)); m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name); - m_threads = new rbd::mirror::Threads<>(reinterpret_cast( - m_local_ioctx.cct())); + m_threads.reset(new rbd::mirror::Threads<>(reinterpret_cast( + m_local_ioctx.cct()))); m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context, - m_local_cluster)); + m_local_cluster, + m_threads.get())); m_image_deleter.reset(new rbd::mirror::ImageDeleter<>( m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, m_service_daemon.get())); @@ -134,7 +135,6 @@ public: delete m_replayer; delete m_instance_watcher; - delete m_threads; EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str())); EXPECT_EQ(0, m_local_cluster->pool_delete(m_local_pool_name.c_str())); @@ -143,7 +143,7 @@ public: template > void create_replayer() { m_replayer = new ImageReplayerT( - m_threads, m_image_deleter.get(), m_instance_watcher, + m_threads.get(), m_image_deleter.get(), m_instance_watcher, rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id"); m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id, @@ -369,10 +369,10 @@ public: static int _image_number; - rbd::mirror::Threads<> *m_threads = nullptr; - unique_ptr> m_service_daemon; - std::unique_ptr> m_image_deleter; std::shared_ptr m_local_cluster; + std::unique_ptr> m_threads; + std::unique_ptr> m_service_daemon; + std::unique_ptr> m_image_deleter; librados::Rados m_remote_cluster; rbd::mirror::InstanceWatcher<> *m_instance_watcher; std::string m_local_mirror_uuid = "local mirror uuid"; diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 914dfc7b8091..88ff456419c6 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -34,7 +34,8 @@ set(rbd_mirror_internal image_sync/SnapshotCreateRequest.cc image_sync/SyncPointCreateRequest.cc image_sync/SyncPointPruneRequest.cc - pool_watcher/RefreshImagesRequest.cc) + pool_watcher/RefreshImagesRequest.cc + service_daemon/Types.cc) add_library(rbd_mirror_internal STATIC ${rbd_mirror_internal}) diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 7852a4c088a7..b4509d5c465d 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -9,8 +9,8 @@ #include "common/errno.h" #include "librbd/ImageCtx.h" #include "Mirror.h" +#include "ServiceDaemon.h" #include "Threads.h" -#include "ImageSync.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -202,11 +202,11 @@ Mirror::Mirror(CephContext *cct, const std::vector &args) : m_args(args), m_lock("rbd::mirror::Mirror"), m_local(new librados::Rados()), - m_service_daemon(m_cct, m_local), m_asok_hook(new MirrorAdminSocketHook(cct, this)) { cct->lookup_or_create_singleton_object >( m_threads, "rbd_mirror::threads"); + m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads)); } Mirror::~Mirror() @@ -237,19 +237,19 @@ int Mirror::init() return r; } - r = m_service_daemon.init(); + r = m_service_daemon->init(); if (r < 0) { derr << "error registering service daemon: " << cpp_strerror(r) << dendl; return r; } m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock, - &m_service_daemon)); + m_service_daemon.get())); m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, - &m_service_daemon)); + m_service_daemon.get())); return r; } @@ -419,7 +419,7 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers) } else { dout(20) << "starting pool replayer for " << peer << dendl; unique_ptr pool_replayer(new PoolReplayer( - m_threads, &m_service_daemon, m_image_deleter.get(), kv.first, + m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first, peer, m_args)); // TODO: make async diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index 8558f82c821c..92f7eb4508a7 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -11,7 +11,6 @@ #include "PoolReplayer.h" #include "ImageDeleter.h" #include "types.h" -#include "ServiceDaemon.h" #include #include @@ -23,6 +22,7 @@ namespace librbd { struct ImageCtx; } namespace rbd { namespace mirror { +template struct ServiceDaemon; template struct Threads; class MirrorAdminSocketHook; @@ -62,7 +62,7 @@ private: Mutex m_lock; Cond m_cond; RadosRef m_local; - ServiceDaemon m_service_daemon; + std::unique_ptr> m_service_daemon; // monitor local cluster for config changes in peers std::unique_ptr m_local_cluster_watcher; diff --git a/src/tools/rbd_mirror/ServiceDaemon.cc b/src/tools/rbd_mirror/ServiceDaemon.cc index 2d0ecae07c64..2f4873912b92 100644 --- a/src/tools/rbd_mirror/ServiceDaemon.cc +++ b/src/tools/rbd_mirror/ServiceDaemon.cc @@ -1,11 +1,17 @@ - // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "tools/rbd_mirror/ServiceDaemon.h" +#include "include/Context.h" #include "include/stringify.h" #include "common/ceph_context.h" #include "common/config.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/Formatter.h" +#include "common/Timer.h" +#include "tools/rbd_mirror/Threads.h" +#include #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -20,10 +26,51 @@ namespace { const std::string RBD_MIRROR_AUTH_ID_PREFIX("rbd-mirror."); +struct AttributeDumpVisitor : public boost::static_visitor { + ceph::Formatter *f; + const std::string& name; + + AttributeDumpVisitor(ceph::Formatter *f, const std::string& name) + : f(f), name(name) { + } + + void operator()(bool val) const { + f->dump_bool(name.c_str(), val); + } + void operator()(uint64_t val) const { + f->dump_unsigned(name.c_str(), val); + } + void operator()(const std::string& val) const { + f->dump_string(name.c_str(), val); + } +}; + } // anonymous namespace +using namespace service_daemon; + +template +ServiceDaemon::ServiceDaemon(CephContext *cct, RadosRef rados, + Threads* threads) + : m_cct(cct), m_rados(rados), m_threads(threads), + m_lock("rbd::mirror::ServiceDaemon") { + dout(20) << dendl; +} + +template +ServiceDaemon::~ServiceDaemon() { + dout(20) << dendl; + Mutex::Locker timer_locker(m_threads->timer_lock); + if (m_timer_ctx != nullptr) { + m_threads->timer->cancel_event(m_timer_ctx); + update_status(); + } +} + template int ServiceDaemon::init() { + dout(20) << dendl; + std::string name = m_cct->_conf->name.get_id(); if (name.find(RBD_MIRROR_AUTH_ID_PREFIX) == 0) { name = name.substr(RBD_MIRROR_AUTH_ID_PREFIX.size()); @@ -41,6 +88,163 @@ int ServiceDaemon::init() { return 0; } +template +void ServiceDaemon::add_pool(int64_t pool_id, const std::string& pool_name) { + dout(20) << "pool_id=" << pool_id << ", pool_name=" << pool_name << dendl; + + { + Mutex::Locker locker(m_lock); + m_pools.insert({pool_id, {pool_name}}); + } + schedule_update_status(); +} + +template +void ServiceDaemon::remove_pool(int64_t pool_id) { + dout(20) << "pool_id=" << pool_id << dendl; + { + Mutex::Locker locker(m_lock); + m_pools.erase(pool_id); + } + schedule_update_status(); +} + +template +uint64_t ServiceDaemon::add_or_update_callout(int64_t pool_id, + uint64_t callout_id, + CalloutLevel callout_level, + const std::string& text) { + dout(20) << "pool_id=" << pool_id << ", " + << "callout_id=" << callout_id << ", " + << "callout_level=" << callout_level << ", " + << "text=" << text << dendl; + + { + Mutex::Locker locker(m_lock); + auto pool_it = m_pools.find(pool_id); + if (pool_it == m_pools.end()) { + return CALLOUT_ID_NONE; + } + + if (callout_id == CALLOUT_ID_NONE) { + callout_id = ++m_callout_id; + } + pool_it->second.callouts[callout_id] = {callout_level, text}; + } + + schedule_update_status(); + return callout_id; +} + +template +void ServiceDaemon::remove_callout(int64_t pool_id, uint64_t callout_id) { + dout(20) << "pool_id=" << pool_id << ", " + << "callout_id=" << callout_id << dendl; + + { + Mutex::Locker locker(m_lock); + auto pool_it = m_pools.find(pool_id); + if (pool_it == m_pools.end()) { + return; + } + pool_it->second.callouts.erase(callout_id); + } + + schedule_update_status(); +} + +template +void ServiceDaemon::add_or_update_attribute(int64_t pool_id, + const std::string& key, + const AttributeValue& value) { + dout(20) << "pool_id=" << pool_id << ", " + << "key=" << key << ", " + << "value=" << value << dendl; + + { + Mutex::Locker locker(m_lock); + auto pool_it = m_pools.find(pool_id); + if (pool_it == m_pools.end()) { + return; + } + pool_it->second.attributes[key] = value; + } + + schedule_update_status(); +} + +template +void ServiceDaemon::remove_attribute(int64_t pool_id, + const std::string& key) { + dout(20) << "pool_id=" << pool_id << ", " + << "key=" << key << dendl; + + { + Mutex::Locker locker(m_lock); + auto pool_it = m_pools.find(pool_id); + if (pool_it == m_pools.end()) { + return; + } + pool_it->second.attributes.erase(key); + } + + schedule_update_status(); +} + +template +void ServiceDaemon::schedule_update_status() { + Mutex::Locker timer_locker(m_threads->timer_lock); + if (m_timer_ctx != nullptr) { + return; + } + + m_timer_ctx = new FunctionContext([this](int) { + m_timer_ctx = nullptr; + update_status(); + }); + m_threads->timer->add_event_after(1, m_timer_ctx); +} + +template +void ServiceDaemon::update_status() { + dout(20) << dendl; + assert(m_threads->timer_lock.is_locked()); + + ceph::JSONFormatter f; + { + Mutex::Locker locker(m_lock); + f.open_object_section("pools"); + for (auto& pool_pair : m_pools) { + f.open_object_section(stringify(pool_pair.first).c_str()); + f.dump_string("name", pool_pair.second.name); + f.open_object_section("callouts"); + for (auto& callout : pool_pair.second.callouts) { + f.open_object_section(stringify(callout.first).c_str()); + f.dump_string("level", stringify(callout.second.level).c_str()); + f.dump_string("text", callout.second.text.c_str()); + f.close_section(); + } + f.close_section(); // callouts + + for (auto& attribute : pool_pair.second.attributes) { + AttributeDumpVisitor attribute_dump_visitor(&f, attribute.first); + boost::apply_visitor(attribute_dump_visitor, attribute.second); + } + f.close_section(); // pool + } + f.close_section(); // pools + } + + std::stringstream ss; + f.flush(ss); + + int r = m_rados->service_daemon_update_status({{"json", ss.str()}}); + if (r < 0) { + derr << "failed to update service daemon status: " << cpp_strerror(r) + << dendl; + } +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/ServiceDaemon.h b/src/tools/rbd_mirror/ServiceDaemon.h index 73c3b9b17fa8..effea4fd745c 100644 --- a/src/tools/rbd_mirror/ServiceDaemon.h +++ b/src/tools/rbd_mirror/ServiceDaemon.h @@ -1,32 +1,81 @@ - // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef CEPH_RBD_MIRROR_SERVICE_DAEMON_H #define CEPH_RBD_MIRROR_SERVICE_DAEMON_H +#include "common/Mutex.h" #include "tools/rbd_mirror/types.h" +#include "tools/rbd_mirror/service_daemon/Types.h" +#include #include struct CephContext; +struct Context; namespace librbd { struct ImageCtx; } namespace rbd { namespace mirror { +template struct Threads; + template class ServiceDaemon { public: - ServiceDaemon(CephContext *cct, RadosRef rados) - : m_cct(cct), m_rados(rados) { - } + ServiceDaemon(CephContext *cct, RadosRef rados, Threads* threads); + ~ServiceDaemon(); int init(); + void add_pool(int64_t pool_id, const std::string& pool_name); + void remove_pool(int64_t pool_id); + + uint64_t add_or_update_callout(int64_t pool_id, uint64_t callout_id, + service_daemon::CalloutLevel callout_level, + const std::string& text); + void remove_callout(int64_t pool_id, uint64_t callout_id); + + void add_or_update_attribute(int64_t pool_id, const std::string& key, + const service_daemon::AttributeValue& value); + void remove_attribute(int64_t pool_id, const std::string& key); + private: + struct Callout { + service_daemon::CalloutLevel level; + std::string text; + + Callout() : level(service_daemon::CALLOUT_LEVEL_INFO) { + } + Callout(service_daemon::CalloutLevel level, const std::string& text) + : level(level), text(text) { + } + }; + typedef std::map Callouts; + typedef std::map Attributes; + + struct Pool { + std::string name; + Callouts callouts; + Attributes attributes; + + Pool(const std::string& name) : name(name) { + } + }; + + typedef std::map Pools; + CephContext *m_cct; RadosRef m_rados; + Threads* m_threads; + + Mutex m_lock; + Pools m_pools; + uint64_t m_callout_id = service_daemon::CALLOUT_ID_NONE; + + Context* m_timer_ctx; + void schedule_update_status(); + void update_status(); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/service_daemon/Types.cc b/src/tools/rbd_mirror/service_daemon/Types.cc new file mode 100644 index 000000000000..7dc6537c534a --- /dev/null +++ b/src/tools/rbd_mirror/service_daemon/Types.cc @@ -0,0 +1,29 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "tools/rbd_mirror/service_daemon/Types.h" +#include + +namespace rbd { +namespace mirror { +namespace service_daemon { + +std::ostream& operator<<(std::ostream& os, const CalloutLevel& callout_level) { + switch (callout_level) { + case CALLOUT_LEVEL_INFO: + os << "info"; + break; + case CALLOUT_LEVEL_WARNING: + os << "warning"; + break; + case CALLOUT_LEVEL_ERROR: + os << "error"; + break; + } + return os; +} + +} // namespace service_daemon +} // namespace mirror +} // namespace rbd + diff --git a/src/tools/rbd_mirror/service_daemon/Types.h b/src/tools/rbd_mirror/service_daemon/Types.h new file mode 100644 index 000000000000..3aab7201614f --- /dev/null +++ b/src/tools/rbd_mirror/service_daemon/Types.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_SERVICE_DAEMON_TYPES_H +#define CEPH_RBD_MIRROR_SERVICE_DAEMON_TYPES_H + +#include "include/int_types.h" +#include +#include +#include + +namespace rbd { +namespace mirror { +namespace service_daemon { + +typedef uint64_t CalloutId; +const uint64_t CALLOUT_ID_NONE {0}; + +enum CalloutLevel { + CALLOUT_LEVEL_INFO, + CALLOUT_LEVEL_WARNING, + CALLOUT_LEVEL_ERROR +}; + +std::ostream& operator<<(std::ostream& os, const CalloutLevel& callout_level); + +typedef boost::variant AttributeValue; + +} // namespace service_daemon +} // namespace mirror +} // namespace rbd + +#endif // CEPH_RBD_MIRROR_SERVICE_DAEMON_TYPES_H