]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: Add `messenger dump <name>` asok command
authorMarcel Lauhoff <marcel.lauhoff@clyso.com>
Fri, 20 Sep 2024 17:33:07 +0000 (19:33 +0200)
committerMarcel Lauhoff <marcel.lauhoff@clyso.com>
Fri, 28 Feb 2025 15:47:02 +0000 (16:47 +0100)
Create admin socket command `messenger dump <name> [filter]
[--tcp-info]` on messenger initialization. `<name>` is the human
readable messenger name (e.g "client", "ms_objecter"). The filter
argument allows partial dumping. `tcp_info` controls inclusion of
TCP_INFO stats. Calling `messenger dump` without a messenger name
returns a list of messengers.

Signed-off-by: Marcel Lauhoff <marcel.lauhoff@clyso.com>
src/common/ceph_context.cc
src/common/ceph_context.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/test/msgr/test_msgr.cc

index 4bf1086e31d9fdf3b3b498abe39fbfc7b2b78934..0769eb99ca79253b1591ae71ca94625dde0f1cba 100644 (file)
@@ -713,6 +713,7 @@ CephContext::CephContext(uint32_t module_type_,
 #ifdef CEPH_DEBUG_MUTEX
     _lockdep_obs(NULL),
 #endif
+    _msgr_hook(nullptr),
     crush_location(this)
 {
   if (options.create_log) {
@@ -775,6 +776,17 @@ CephContext::CephContext(uint32_t module_type_,
   lookup_or_create_singleton_object<MempoolObs>("mempool_obs", false, this);
 }
 
+void CephContext::modify_msgr_hook(
+    std::function<AdminSocketHook*(void)> create,
+    std::function<void(AdminSocketHook*)> add) {
+  std::lock_guard l{_msgr_hook_lock};
+  if (_msgr_hook) {
+    add(_msgr_hook.get());
+  } else {
+    _msgr_hook.reset(create());
+  }
+}
+
 CephContext::~CephContext()
 {
   associated_objs.clear();
@@ -788,6 +800,9 @@ CephContext::~CephContext()
 
   delete _plugin_registry;
 
+  if (_msgr_hook) {
+    _admin_socket->unregister_commands(_msgr_hook.get());
+  }
   _admin_socket->unregister_commands(_admin_hook);
   delete _admin_hook;
   delete _admin_socket;
index 6a02d5c5bf1fcc93cd31295f8e7c7271423b211e..62c82d09b6c151ff5b424033caf5afe65f50e13f 100644 (file)
@@ -47,6 +47,7 @@
 #include "crush/CrushLocation.h"
 
 class AdminSocket;
+class AdminSocketHook;
 class CryptoHandler;
 class CryptoRandom;
 class MonMap;
@@ -381,8 +382,13 @@ private:
 #ifdef CEPH_DEBUG_MUTEX
   md_config_obs_t *_lockdep_obs;
 #endif
+
+  std::unique_ptr<AdminSocketHook> _msgr_hook;
+  ceph::mutex _msgr_hook_lock = ceph::make_mutex("CephContext::msgr_hook");
 public:
   TOPNSPC::crush::CrushLocation crush_location;
+  void modify_msgr_hook(std::function<AdminSocketHook*(void)> create,
+                       std::function<void(AdminSocketHook*)> add);
 private:
 
   enum {
index 34c7f6477871e52901107464377ec0fe392bd3fa..82fcce93e9a5f694deb3a1b1c5620ea29f285074 100644 (file)
 
 #include "acconfig.h"
 
+#include <algorithm>
 #include <iostream>
 #include <fstream>
+#include <iterator>
 
 #include "AsyncMessenger.h"
 #include <utime.h>
+#include <errno.h>
 
 #include "common/Clock.h"
 #include "common/Formatter.h"
+#include "common/cmdparse.h"
 #include "common/config.h"
 #include "common/Timer.h"
 #include "common/errno.h"
@@ -276,6 +280,84 @@ class C_handle_reap : public EventCallback {
   }
 };
 
+/*******************
+ * Admin Socket Hook
+ */
+
+AsyncMessengerSocketHook::AsyncMessengerSocketHook(
+    AsyncMessenger& m, const std::string& name)
+    : m_msgrs{{name, &m}} {}
+
+int AsyncMessengerSocketHook::call(
+    std::string_view command, const cmdmap_t& cmdmap, const bufferlist&,
+    Formatter* f, std::ostream& errss, ceph::buffer::list& out) {
+  if (command == "messenger dump") {
+    std::string name;
+    if (common::cmd_getval(cmdmap, "msgr", name)) {
+      if (auto it = m_msgrs.find(name); it != m_msgrs.end()) {
+        std::vector<std::string> opts;
+        const bool tcp_info =
+            common::cmd_getval_or<bool>(cmdmap, "tcp_info", false);
+        const bool has_filter =
+            common::cmd_getval(cmdmap, "dumpcontents", opts);
+        const std::set<std::string> optset(opts.begin(), opts.end());
+        const bool all = !has_filter || optset.contains("all");
+        const auto filter_fn = [&](const std::string& key) {
+          if (key == "tcp_info") {
+            return tcp_info;
+          } else if (all) {
+            return true;
+          } else {
+            return optset.contains(key);
+          }
+        };
+
+        const auto msgr = (*it).second;
+        f->open_object_section("status");
+       f->dump_string("name", name);
+        f->open_object_section("messenger");
+        msgr->dump(f, filter_fn);
+        f->close_section();  // messenger
+        f->close_section();  // status
+        return 0;
+      } else {
+        return -ENOENT;
+      }
+    } else {
+      f->open_object_section("status");
+      f->open_array_section("messengers");
+      for (const auto& [name, _] : m_msgrs) {
+        f->dump_string("name", name);
+      }
+      f->close_section();
+      f->close_section();
+      return 0;
+    }
+  }
+  return -ENOSYS;
+}
+
+void AsyncMessengerSocketHook::add_messenger(
+    const std::string& name, AsyncMessenger& msgr) {
+  m_msgrs.try_emplace(name, &msgr);
+}
+
+void AsyncMessengerSocketHook::remove_messenger(
+    AsyncMessenger& msgr) {
+  for (auto it = m_msgrs.begin(); it != m_msgrs.end(); ++it) {
+    if (&msgr == it->second) {
+      m_msgrs.erase(it);
+      break;
+    }
+  }
+}
+std::list<std::string> AsyncMessengerSocketHook::messengers() const {
+  std::list<std::string> result;
+  std::transform(m_msgrs.begin(), m_msgrs.end(), std::back_inserter(result),
+                [](const auto& pair) { return pair.first; });
+  return result;
+}
+
 /*******************
  * AsyncMessenger
  */
@@ -307,6 +389,29 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     processor_num = stack->get_num_worker();
   for (unsigned i = 0; i < processor_num; ++i)
     processors.push_back(new Processor(this, stack->get_worker(i), cct));
+
+  cct->modify_msgr_hook(
+      [&]() {
+       auto hook = new AsyncMessengerSocketHook(*this, mname);
+       const int asok_ret = cct->get_admin_socket()->register_command(
+           AsyncMessengerSocketHook::COMMAND, hook,
+           "dump messenger status");
+       if (asok_ret != 0) {
+         ldout(cct, 0) << __func__ << " messenger asok command \""
+                             << AsyncMessengerSocketHook::COMMAND
+                             << "\" failed with" << asok_ret << dendl;
+       }
+       return hook;
+      },
+      [&](AdminSocketHook* ptr) {
+       if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+         hook->add_messenger(mname, *this);
+       } else {
+         ceph_abort(
+             "BUG: messenger hook obj set, but not of type "
+             "AsyncMessengerSocketHook");
+       }
+      });
 }
 
 /**
@@ -315,6 +420,17 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
  */
 AsyncMessenger::~AsyncMessenger()
 {
+  cct->modify_msgr_hook(
+      []() -> AdminSocketHook* { return nullptr; },
+      [&](AdminSocketHook* ptr) {
+       if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+         hook->remove_messenger(*this);
+       } else {
+         ceph_abort(
+             "BUG: messenger hook obj set, but not of type "
+             "AsyncMessengerSocketHook");
+       }
+      });
   delete reap_handler;
   ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
   for (auto &&p : processors)
index cbc580b8d9b7da45167ebc5a124210e34ab2c743..8304ac406ff543b6d0ffd4426db81c348a6c1b2f 100644 (file)
@@ -36,6 +36,7 @@
 #include "Event.h"
 
 #include "include/ceph_assert.h"
+#include "common/admin_socket.h"
 
 class AsyncMessenger;
 
@@ -65,6 +66,26 @@ class Processor {
   friend class AsyncMessenger;
 };
 
+class AsyncMessengerSocketHook : public AdminSocketHook {
+  std::map<std::string, AsyncMessenger*> m_msgrs;
+
+ public:
+  static constexpr std::string_view COMMAND =
+      "messenger dump "
+      "name=msgr,type=CephString,req=false "
+      "name=dumpcontents,type=CephChoices,"
+      "strings=all|listen_sockets|connections|anon_conns|accepting_conns|deleted_conns,"
+      "n=N,req=false "
+      "name=tcp_info,type=CephBool,req=false";
+  AsyncMessengerSocketHook(AsyncMessenger& m, const std::string& name);
+  int call(
+      std::string_view command, const cmdmap_t& cmdmap, const bufferlist&,
+      Formatter* f, std::ostream& errss, ceph::buffer::list& out) override;
+  void add_messenger(const std::string& name, AsyncMessenger& msgr);
+  void remove_messenger(AsyncMessenger& msgr);
+  std::list<std::string> messengers() const;
+};
+
 /*
  * AsyncMessenger is represented for maintaining a set of asynchronous connections,
  * it may own a bind address and the accepted connections will be managed by
index 07f812589d117b6488dfc51a646b927d285368ab..b3a3de4843ab85be889a099f971a1ba7e1b3311c 100644 (file)
@@ -19,6 +19,7 @@
 #include <list>
 #include <memory>
 #include <set>
+#include <gmock/gmock-matchers.h>
 #include <stdlib.h>
 #include <time.h>
 #include <unistd.h>
@@ -26,6 +27,8 @@
 #include <boost/random/binomial_distribution.hpp>
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int.hpp>
+#include <sstream>
+#include "common/Formatter.h"
 #include <gtest/gtest.h>
 
 #define MSG_POLICY_UNIT_TESTING
@@ -40,6 +43,7 @@
 #include "msg/Message.h"
 #include "msg/Messenger.h"
 #include "msg/msg_types.h"
+#include "msg/async/AsyncMessenger.h"
 
 typedef boost::mt11213b gen_type;
 
@@ -897,6 +901,122 @@ TEST_P(MessengerTest, ReconnectRaceTest) {
   delete srv_interceptor;
 }
 
+TEST_P(MessengerTest, DumpBasics) {
+  // rudimentary check dump results. see integration test in
+  // qa/workunits/cephtool/test.sh for detailed tests
+  if (std::strstr(GetParam(), "async") == nullptr) {
+    GTEST_SKIP() << "skipping as only async messengers have a messenger dump hook";
+  }
+
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  auto f = Formatter::create_unique("json");
+  std::ostringstream os;
+
+  server_msgr->get_myaddrs().dump(f.get());
+  f->flush(os);
+  const auto server_addr = os.str();
+
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
+  f->reset();
+  os.clear();
+  server_msgr->dump(f.get());
+  f->flush(os);
+  const auto server_dump = os.str();
+  f->reset();
+  os.clear();
+  client_msgr->dump(f.get());
+  f->flush(os);
+  const auto client_dump = os.str();
+
+  ASSERT_THAT(server_dump, ::testing::HasSubstr(server_addr)) << server_dump;
+  ASSERT_THAT(client_dump, ::testing::HasSubstr(server_addr)) << client_dump;
+
+  server_msgr->shutdown();
+  server_msgr->wait();
+  client_msgr->shutdown();
+  client_msgr->wait();
+}
+
+TEST(MessengerTest, AdminSocketHookLifecycle) {
+  DummyAuthClientServer dummy_auth(g_ceph_context);
+  Messenger* server_msgr = Messenger::create(
+      g_ceph_context, "async+unix", entity_name_t::OSD(0), "server", getpid());
+  Messenger* client_msgr = Messenger::create(
+      g_ceph_context, "async+unix", entity_name_t::CLIENT(-1), "client",
+      getpid());
+  server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+  client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+  server_msgr->set_auth_client(&dummy_auth);
+  server_msgr->set_auth_server(&dummy_auth);
+  client_msgr->set_auth_client(&dummy_auth);
+  client_msgr->set_auth_server(&dummy_auth);
+  server_msgr->set_require_authorizer(false);
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  bool create_called = false;
+  bool add_called = false;
+  g_ceph_context->modify_msgr_hook(
+      [&]() -> AdminSocketHook* {
+        create_called = true;
+        return nullptr;
+      },
+      [&](AdminSocketHook* ptr) {
+        ASSERT_FALSE(create_called);
+        add_called = true;
+        if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+          auto msgrs = hook->messengers();
+          ASSERT_EQ(2, msgrs.size());
+          ASSERT_THAT(
+              msgrs, ::testing::UnorderedElementsAre("server", "client"));
+        } else {
+          FAIL() << "invalid type";
+        }
+      });
+  ASSERT_TRUE(add_called);
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  delete client_msgr;
+
+  g_ceph_context->modify_msgr_hook(
+      [&]() -> AdminSocketHook* {
+        create_called = true;
+        return nullptr;
+      },
+      [&](AdminSocketHook* ptr) {
+        ASSERT_FALSE(create_called);
+        add_called = true;
+        if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+          auto msgrs = hook->messengers();
+          ASSERT_EQ(1, msgrs.size());
+          ASSERT_THAT(msgrs, ::testing::ElementsAre("server"));
+        } else {
+          FAIL() << "invalid type";
+        }
+      });
+
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete server_msgr;
+}
+
 TEST_P(MessengerTest, SimpleTest) {
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;