]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: service daemon status formatter
authorJason Dillaman <dillaman@redhat.com>
Fri, 14 Jul 2017 18:28:04 +0000 (14:28 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 18 Jul 2017 14:28:14 +0000 (10:28 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_ClusterWatcher.cc
src/test/rbd_mirror/test_ImageDeleter.cc
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/ServiceDaemon.cc
src/tools/rbd_mirror/ServiceDaemon.h
src/tools/rbd_mirror/service_daemon/Types.cc [new file with mode: 0644]
src/tools/rbd_mirror/service_daemon/Types.h [new file with mode: 0644]

index 61f67a5f2279b546520a01eaab7880a660734b36..8aafbdb4fe32f36681185563d20a62d35a71cc60 100644 (file)
@@ -35,10 +35,6 @@ public:
   {
     m_cluster = std::make_shared<librados::Rados>();
     EXPECT_EQ("", connect_cluster_pp(*m_cluster));
-    m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context,
-                                                            m_cluster));
-    m_cluster_watcher.reset(new ClusterWatcher(m_cluster, m_lock,
-                                               m_service_daemon.get()));
   }
 
   ~TestClusterWatcher() override {
@@ -48,6 +44,21 @@ public:
     }
   }
 
+  void SetUp() override {
+    TestFixture::SetUp();
+    m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context,
+                                                            m_cluster,
+                                                            m_threads));
+    m_cluster_watcher.reset(new ClusterWatcher(m_cluster, m_lock,
+                                               m_service_daemon.get()));
+  }
+
+  void TearDown() override {
+    m_service_daemon.reset();
+    m_cluster_watcher.reset();
+    TestFixture::TearDown();
+  }
+
   void create_pool(bool enable_mirroring, const peer_t &peer,
                    string *uuid = nullptr, string *name=nullptr) {
     string pool_name = get_temp_pool_name("test-rbd-mirror-");
index 0f0b44fdb149bc7f0101fe363ce0526e56952fe0..56f1dde032d5337221638e4dd582c1f5131c4bf7 100644 (file)
@@ -63,7 +63,7 @@ public:
   void SetUp() override {
     TestFixture::SetUp();
     m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context,
-                                                            _rados));
+                                                            _rados, m_threads));
 
     librbd::api::Mirror<>::mode_set(m_local_io_ctx, RBD_MIRROR_MODE_IMAGE);
 
@@ -89,8 +89,10 @@ public:
 
   void TearDown() override {
     remove_image();
-    TestFixture::TearDown();
     delete m_deleter;
+    m_service_daemon.reset();
+
+    TestFixture::TearDown();
   }
 
   void remove_image(bool force=false) {
index 810ebff41a7ebab00ec63f78d44d65ea3ca1a043..c94d2ccee3ee1cccc4ab8fef18074eb8aedea314 100644 (file)
@@ -113,11 +113,12 @@ public:
                                false, features, &order, 0, 0));
     m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);
 
-    m_threads = new rbd::mirror::Threads<>(reinterpret_cast<CephContext*>(
-      m_local_ioctx.cct()));
+    m_threads.reset(new rbd::mirror::Threads<>(reinterpret_cast<CephContext*>(
+      m_local_ioctx.cct())));
 
     m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context,
-                                                            m_local_cluster));
+                                                            m_local_cluster,
+                                                            m_threads.get()));
     m_image_deleter.reset(new rbd::mirror::ImageDeleter<>(
       m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
       m_service_daemon.get()));
@@ -134,7 +135,6 @@ public:
 
     delete m_replayer;
     delete m_instance_watcher;
-    delete m_threads;
 
     EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
     EXPECT_EQ(0, m_local_cluster->pool_delete(m_local_pool_name.c_str()));
@@ -143,7 +143,7 @@ public:
   template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
   void create_replayer() {
     m_replayer = new ImageReplayerT(
-        m_threads, m_image_deleter.get(), m_instance_watcher,
+        m_threads.get(), m_image_deleter.get(), m_instance_watcher,
         rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
         m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
     m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id,
@@ -369,10 +369,10 @@ public:
 
   static int _image_number;
 
-  rbd::mirror::Threads<> *m_threads = nullptr;
-  unique_ptr<rbd::mirror::ServiceDaemon<>> m_service_daemon;
-  std::unique_ptr<rbd::mirror::ImageDeleter<>> m_image_deleter;
   std::shared_ptr<librados::Rados> m_local_cluster;
+  std::unique_ptr<rbd::mirror::Threads<>> m_threads;
+  std::unique_ptr<rbd::mirror::ServiceDaemon<>> m_service_daemon;
+  std::unique_ptr<rbd::mirror::ImageDeleter<>> m_image_deleter;
   librados::Rados m_remote_cluster;
   rbd::mirror::InstanceWatcher<> *m_instance_watcher;
   std::string m_local_mirror_uuid = "local mirror uuid";
index 914dfc7b80912febb24d1383f55db4fbcdd330db..88ff456419c61cc591748780df61362557cd765d 100644 (file)
@@ -34,7 +34,8 @@ set(rbd_mirror_internal
   image_sync/SnapshotCreateRequest.cc
   image_sync/SyncPointCreateRequest.cc
   image_sync/SyncPointPruneRequest.cc
-  pool_watcher/RefreshImagesRequest.cc)
+  pool_watcher/RefreshImagesRequest.cc
+  service_daemon/Types.cc)
 add_library(rbd_mirror_internal STATIC
   ${rbd_mirror_internal})
 
index 7852a4c088a7958e789e563b52ae56179c0a6120..b4509d5c465d5cb725aa9d2e3ab4a4a1c51bd9fc 100644 (file)
@@ -9,8 +9,8 @@
 #include "common/errno.h"
 #include "librbd/ImageCtx.h"
 #include "Mirror.h"
+#include "ServiceDaemon.h"
 #include "Threads.h"
-#include "ImageSync.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -202,11 +202,11 @@ Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
   m_args(args),
   m_lock("rbd::mirror::Mirror"),
   m_local(new librados::Rados()),
-  m_service_daemon(m_cct, m_local),
   m_asok_hook(new MirrorAdminSocketHook(cct, this))
 {
   cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
     m_threads, "rbd_mirror::threads");
+  m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
 }
 
 Mirror::~Mirror()
@@ -237,19 +237,19 @@ int Mirror::init()
     return r;
   }
 
-  r = m_service_daemon.init();
+  r = m_service_daemon->init();
   if (r < 0) {
     derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
     return r;
   }
 
   m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
-                                                   &m_service_daemon));
+                                                   m_service_daemon.get()));
 
   m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue,
                                            m_threads->timer,
                                            &m_threads->timer_lock,
-                                           &m_service_daemon));
+                                           m_service_daemon.get()));
   return r;
 }
 
@@ -419,7 +419,7 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
       } else {
         dout(20) << "starting pool replayer for " << peer << dendl;
         unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
-         m_threads, &m_service_daemon, m_image_deleter.get(), kv.first,
+         m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first,
           peer, m_args));
 
         // TODO: make async
index 8558f82c821cc92473ee3c3ce88adea519a67c10..92f7eb4508a7c4429fe1968ba198efbb6835f177 100644 (file)
@@ -11,7 +11,6 @@
 #include "PoolReplayer.h"
 #include "ImageDeleter.h"
 #include "types.h"
-#include "ServiceDaemon.h"
 
 #include <set>
 #include <map>
@@ -23,6 +22,7 @@ namespace librbd { struct ImageCtx; }
 namespace rbd {
 namespace mirror {
 
+template <typename> struct ServiceDaemon;
 template <typename> struct Threads;
 class MirrorAdminSocketHook;
 
@@ -62,7 +62,7 @@ private:
   Mutex m_lock;
   Cond m_cond;
   RadosRef m_local;
-  ServiceDaemon<librbd::ImageCtx> m_service_daemon;
+  std::unique_ptr<ServiceDaemon<librbd::ImageCtx>> m_service_daemon;
 
   // monitor local cluster for config changes in peers
   std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
index 2d0ecae07c643763342738556b8ee10cee730c02..2f4873912b923eb119ccda0f82bfad191359c8e6 100644 (file)
@@ -1,11 +1,17 @@
-
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
 #include "tools/rbd_mirror/ServiceDaemon.h"
+#include "include/Context.h"
 #include "include/stringify.h"
 #include "common/ceph_context.h"
 #include "common/config.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Formatter.h"
+#include "common/Timer.h"
+#include "tools/rbd_mirror/Threads.h"
+#include <sstream>
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -20,10 +26,51 @@ namespace {
 
 const std::string RBD_MIRROR_AUTH_ID_PREFIX("rbd-mirror.");
 
+struct AttributeDumpVisitor : public boost::static_visitor<void> {
+  ceph::Formatter *f;
+  const std::string& name;
+
+  AttributeDumpVisitor(ceph::Formatter *f, const std::string& name)
+    : f(f), name(name) {
+  }
+
+  void operator()(bool val) const {
+    f->dump_bool(name.c_str(), val);
+  }
+  void operator()(uint64_t val) const {
+    f->dump_unsigned(name.c_str(), val);
+  }
+  void operator()(const std::string& val) const {
+    f->dump_string(name.c_str(), val);
+  }
+};
+
 } // anonymous namespace
 
+using namespace service_daemon;
+
+template <typename I>
+ServiceDaemon<I>::ServiceDaemon(CephContext *cct, RadosRef rados,
+                                Threads<I>* threads)
+  : m_cct(cct), m_rados(rados), m_threads(threads),
+    m_lock("rbd::mirror::ServiceDaemon") {
+  dout(20) << dendl;
+}
+
+template <typename I>
+ServiceDaemon<I>::~ServiceDaemon() {
+  dout(20) << dendl;
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  if (m_timer_ctx != nullptr) {
+    m_threads->timer->cancel_event(m_timer_ctx);
+    update_status();
+  }
+}
+
 template <typename I>
 int ServiceDaemon<I>::init() {
+  dout(20) << dendl;
+
   std::string name = m_cct->_conf->name.get_id();
   if (name.find(RBD_MIRROR_AUTH_ID_PREFIX) == 0) {
     name = name.substr(RBD_MIRROR_AUTH_ID_PREFIX.size());
@@ -41,6 +88,163 @@ int ServiceDaemon<I>::init() {
   return 0;
 }
 
+template <typename I>
+void ServiceDaemon<I>::add_pool(int64_t pool_id, const std::string& pool_name) {
+  dout(20) << "pool_id=" << pool_id << ", pool_name=" << pool_name << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    m_pools.insert({pool_id, {pool_name}});
+  }
+  schedule_update_status();
+}
+
+template <typename I>
+void ServiceDaemon<I>::remove_pool(int64_t pool_id) {
+  dout(20) << "pool_id=" << pool_id << dendl;
+  {
+    Mutex::Locker locker(m_lock);
+    m_pools.erase(pool_id);
+  }
+  schedule_update_status();
+}
+
+template <typename I>
+uint64_t ServiceDaemon<I>::add_or_update_callout(int64_t pool_id,
+                                                 uint64_t callout_id,
+                                                 CalloutLevel callout_level,
+                                                 const std::string& text) {
+  dout(20) << "pool_id=" << pool_id << ", "
+           << "callout_id=" << callout_id << ", "
+           << "callout_level=" << callout_level << ", "
+           << "text=" << text << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    auto pool_it = m_pools.find(pool_id);
+    if (pool_it == m_pools.end()) {
+      return CALLOUT_ID_NONE;
+    }
+
+    if (callout_id == CALLOUT_ID_NONE) {
+      callout_id = ++m_callout_id;
+    }
+    pool_it->second.callouts[callout_id] = {callout_level, text};
+  }
+
+  schedule_update_status();
+  return callout_id;
+}
+
+template <typename I>
+void ServiceDaemon<I>::remove_callout(int64_t pool_id, uint64_t callout_id) {
+  dout(20) << "pool_id=" << pool_id << ", "
+           << "callout_id=" << callout_id << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    auto pool_it = m_pools.find(pool_id);
+    if (pool_it == m_pools.end()) {
+      return;
+    }
+    pool_it->second.callouts.erase(callout_id);
+  }
+
+  schedule_update_status();
+}
+
+template <typename I>
+void ServiceDaemon<I>::add_or_update_attribute(int64_t pool_id,
+                                               const std::string& key,
+                                               const AttributeValue& value) {
+  dout(20) << "pool_id=" << pool_id << ", "
+           << "key=" << key << ", "
+           << "value=" << value << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    auto pool_it = m_pools.find(pool_id);
+    if (pool_it == m_pools.end()) {
+      return;
+    }
+    pool_it->second.attributes[key] = value;
+  }
+
+  schedule_update_status();
+}
+
+template <typename I>
+void ServiceDaemon<I>::remove_attribute(int64_t pool_id,
+                                        const std::string& key) {
+  dout(20) << "pool_id=" << pool_id << ", "
+           << "key=" << key << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    auto pool_it = m_pools.find(pool_id);
+    if (pool_it == m_pools.end()) {
+      return;
+    }
+    pool_it->second.attributes.erase(key);
+  }
+
+  schedule_update_status();
+}
+
+template <typename I>
+void ServiceDaemon<I>::schedule_update_status() {
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  if (m_timer_ctx != nullptr) {
+    return;
+  }
+
+  m_timer_ctx = new FunctionContext([this](int) {
+      m_timer_ctx = nullptr;
+      update_status();
+    });
+  m_threads->timer->add_event_after(1, m_timer_ctx);
+}
+
+template <typename I>
+void ServiceDaemon<I>::update_status() {
+  dout(20) << dendl;
+  assert(m_threads->timer_lock.is_locked());
+
+  ceph::JSONFormatter f;
+  {
+    Mutex::Locker locker(m_lock);
+    f.open_object_section("pools");
+    for (auto& pool_pair : m_pools) {
+      f.open_object_section(stringify(pool_pair.first).c_str());
+      f.dump_string("name", pool_pair.second.name);
+      f.open_object_section("callouts");
+      for (auto& callout : pool_pair.second.callouts) {
+        f.open_object_section(stringify(callout.first).c_str());
+        f.dump_string("level", stringify(callout.second.level).c_str());
+        f.dump_string("text", callout.second.text.c_str());
+        f.close_section();
+      }
+      f.close_section(); // callouts
+
+      for (auto& attribute : pool_pair.second.attributes) {
+        AttributeDumpVisitor attribute_dump_visitor(&f, attribute.first);
+        boost::apply_visitor(attribute_dump_visitor, attribute.second);
+      }
+      f.close_section(); // pool
+    }
+    f.close_section(); // pools
+  }
+
+  std::stringstream ss;
+  f.flush(ss);
+
+  int r = m_rados->service_daemon_update_status({{"json", ss.str()}});
+  if (r < 0) {
+    derr << "failed to update service daemon status: " << cpp_strerror(r)
+         << dendl;
+  }
+}
+
 } // namespace mirror
 } // namespace rbd
 
index 73c3b9b17fa8947e7fbbcf72e1c56fd07d21be20..effea4fd745c5a3bbab580d93b3043d7e0b036ef 100644 (file)
@@ -1,32 +1,81 @@
-
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
 #ifndef CEPH_RBD_MIRROR_SERVICE_DAEMON_H
 #define CEPH_RBD_MIRROR_SERVICE_DAEMON_H
 
+#include "common/Mutex.h"
 #include "tools/rbd_mirror/types.h"
+#include "tools/rbd_mirror/service_daemon/Types.h"
+#include <map>
 #include <string>
 
 struct CephContext;
+struct Context;
 namespace librbd { struct ImageCtx; }
 
 namespace rbd {
 namespace mirror {
 
+template <typename> struct Threads;
+
 template <typename ImageCtxT = librbd::ImageCtx>
 class ServiceDaemon {
 public:
-  ServiceDaemon(CephContext *cct, RadosRef rados)
-    : m_cct(cct), m_rados(rados) {
-  }
+  ServiceDaemon(CephContext *cct, RadosRef rados, Threads<ImageCtxT>* threads);
+  ~ServiceDaemon();
 
   int init();
 
+  void add_pool(int64_t pool_id, const std::string& pool_name);
+  void remove_pool(int64_t pool_id);
+
+  uint64_t add_or_update_callout(int64_t pool_id, uint64_t callout_id,
+                                 service_daemon::CalloutLevel callout_level,
+                                 const std::string& text);
+  void remove_callout(int64_t pool_id, uint64_t callout_id);
+
+  void add_or_update_attribute(int64_t pool_id, const std::string& key,
+                               const service_daemon::AttributeValue& value);
+  void remove_attribute(int64_t pool_id, const std::string& key);
+
 private:
+  struct Callout {
+    service_daemon::CalloutLevel level;
+    std::string text;
+
+    Callout() : level(service_daemon::CALLOUT_LEVEL_INFO) {
+    }
+    Callout(service_daemon::CalloutLevel level, const std::string& text)
+      : level(level), text(text) {
+    }
+  };
+  typedef std::map<uint64_t, Callout> Callouts;
+  typedef std::map<std::string, service_daemon::AttributeValue> Attributes;
+
+  struct Pool {
+    std::string name;
+    Callouts callouts;
+    Attributes attributes;
+
+    Pool(const std::string& name) : name(name) {
+    }
+  };
+
+  typedef std::map<int64_t, Pool> Pools;
+
   CephContext *m_cct;
   RadosRef m_rados;
+  Threads<ImageCtxT>* m_threads;
+
+  Mutex m_lock;
+  Pools m_pools;
+  uint64_t m_callout_id = service_daemon::CALLOUT_ID_NONE;
+
+  Context* m_timer_ctx;
 
+  void schedule_update_status();
+  void update_status();
 };
 
 } // namespace mirror
diff --git a/src/tools/rbd_mirror/service_daemon/Types.cc b/src/tools/rbd_mirror/service_daemon/Types.cc
new file mode 100644 (file)
index 0000000..7dc6537
--- /dev/null
@@ -0,0 +1,29 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tools/rbd_mirror/service_daemon/Types.h"
+#include <iostream>
+
+namespace rbd {
+namespace mirror {
+namespace service_daemon {
+
+std::ostream& operator<<(std::ostream& os, const CalloutLevel& callout_level) {
+  switch (callout_level) {
+  case CALLOUT_LEVEL_INFO:
+    os << "info";
+    break;
+  case CALLOUT_LEVEL_WARNING:
+    os << "warning";
+    break;
+  case CALLOUT_LEVEL_ERROR:
+    os << "error";
+    break;
+  }
+  return os;
+}
+
+} // namespace service_daemon
+} // namespace mirror
+} // namespace rbd
+
diff --git a/src/tools/rbd_mirror/service_daemon/Types.h b/src/tools/rbd_mirror/service_daemon/Types.h
new file mode 100644 (file)
index 0000000..3aab720
--- /dev/null
@@ -0,0 +1,33 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_SERVICE_DAEMON_TYPES_H
+#define CEPH_RBD_MIRROR_SERVICE_DAEMON_TYPES_H
+
+#include "include/int_types.h"
+#include <iosfwd>
+#include <string>
+#include <boost/variant.hpp>
+
+namespace rbd {
+namespace mirror {
+namespace service_daemon {
+
+typedef uint64_t CalloutId;
+const uint64_t CALLOUT_ID_NONE {0};
+
+enum CalloutLevel {
+  CALLOUT_LEVEL_INFO,
+  CALLOUT_LEVEL_WARNING,
+  CALLOUT_LEVEL_ERROR
+};
+
+std::ostream& operator<<(std::ostream& os, const CalloutLevel& callout_level);
+
+typedef boost::variant<bool, uint64_t, std::string> AttributeValue;
+
+} // namespace service_daemon
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_SERVICE_DAEMON_TYPES_H