From: Marcel Lauhoff Date: Fri, 20 Sep 2024 17:33:07 +0000 (+0200) Subject: msg: Add `messenger dump ` asok command X-Git-Tag: v20.3.0~373^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4b7d43751a80b058ada467a68b88f91ec39f290c;p=ceph.git msg: Add `messenger dump ` asok command Create admin socket command `messenger dump [filter] [--tcp-info]` on messenger initialization. `` is the human readable messenger name (e.g "client", "ms_objecter"). The filter argument allows partial dumping. `tcp_info` controls inclusion of TCP_INFO stats. Calling `messenger dump` without a messenger name returns a list of messengers. Signed-off-by: Marcel Lauhoff --- diff --git a/src/common/ceph_context.cc b/src/common/ceph_context.cc index 4bf1086e31d9..0769eb99ca79 100644 --- a/src/common/ceph_context.cc +++ b/src/common/ceph_context.cc @@ -713,6 +713,7 @@ CephContext::CephContext(uint32_t module_type_, #ifdef CEPH_DEBUG_MUTEX _lockdep_obs(NULL), #endif + _msgr_hook(nullptr), crush_location(this) { if (options.create_log) { @@ -775,6 +776,17 @@ CephContext::CephContext(uint32_t module_type_, lookup_or_create_singleton_object("mempool_obs", false, this); } +void CephContext::modify_msgr_hook( + std::function create, + std::function add) { + std::lock_guard l{_msgr_hook_lock}; + if (_msgr_hook) { + add(_msgr_hook.get()); + } else { + _msgr_hook.reset(create()); + } +} + CephContext::~CephContext() { associated_objs.clear(); @@ -788,6 +800,9 @@ CephContext::~CephContext() delete _plugin_registry; + if (_msgr_hook) { + _admin_socket->unregister_commands(_msgr_hook.get()); + } _admin_socket->unregister_commands(_admin_hook); delete _admin_hook; delete _admin_socket; diff --git a/src/common/ceph_context.h b/src/common/ceph_context.h index 6a02d5c5bf1f..62c82d09b6c1 100644 --- a/src/common/ceph_context.h +++ b/src/common/ceph_context.h @@ -47,6 +47,7 @@ #include "crush/CrushLocation.h" class AdminSocket; +class AdminSocketHook; class CryptoHandler; class CryptoRandom; class MonMap; @@ -381,8 +382,13 @@ private: #ifdef CEPH_DEBUG_MUTEX md_config_obs_t *_lockdep_obs; #endif + + std::unique_ptr _msgr_hook; + ceph::mutex _msgr_hook_lock = ceph::make_mutex("CephContext::msgr_hook"); public: TOPNSPC::crush::CrushLocation crush_location; + void modify_msgr_hook(std::function create, + std::function add); private: enum { diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 34c7f6477871..82fcce93e9a5 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -16,14 +16,18 @@ #include "acconfig.h" +#include #include #include +#include #include "AsyncMessenger.h" #include +#include #include "common/Clock.h" #include "common/Formatter.h" +#include "common/cmdparse.h" #include "common/config.h" #include "common/Timer.h" #include "common/errno.h" @@ -276,6 +280,84 @@ class C_handle_reap : public EventCallback { } }; +/******************* + * Admin Socket Hook + */ + +AsyncMessengerSocketHook::AsyncMessengerSocketHook( + AsyncMessenger& m, const std::string& name) + : m_msgrs{{name, &m}} {} + +int AsyncMessengerSocketHook::call( + std::string_view command, const cmdmap_t& cmdmap, const bufferlist&, + Formatter* f, std::ostream& errss, ceph::buffer::list& out) { + if (command == "messenger dump") { + std::string name; + if (common::cmd_getval(cmdmap, "msgr", name)) { + if (auto it = m_msgrs.find(name); it != m_msgrs.end()) { + std::vector opts; + const bool tcp_info = + common::cmd_getval_or(cmdmap, "tcp_info", false); + const bool has_filter = + common::cmd_getval(cmdmap, "dumpcontents", opts); + const std::set optset(opts.begin(), opts.end()); + const bool all = !has_filter || optset.contains("all"); + const auto filter_fn = [&](const std::string& key) { + if (key == "tcp_info") { + return tcp_info; + } else if (all) { + return true; + } else { + return optset.contains(key); + } + }; + + const auto msgr = (*it).second; + f->open_object_section("status"); + f->dump_string("name", name); + f->open_object_section("messenger"); + msgr->dump(f, filter_fn); + f->close_section(); // messenger + f->close_section(); // status + return 0; + } else { + return -ENOENT; + } + } else { + f->open_object_section("status"); + f->open_array_section("messengers"); + for (const auto& [name, _] : m_msgrs) { + f->dump_string("name", name); + } + f->close_section(); + f->close_section(); + return 0; + } + } + return -ENOSYS; +} + +void AsyncMessengerSocketHook::add_messenger( + const std::string& name, AsyncMessenger& msgr) { + m_msgrs.try_emplace(name, &msgr); +} + +void AsyncMessengerSocketHook::remove_messenger( + AsyncMessenger& msgr) { + for (auto it = m_msgrs.begin(); it != m_msgrs.end(); ++it) { + if (&msgr == it->second) { + m_msgrs.erase(it); + break; + } + } +} +std::list AsyncMessengerSocketHook::messengers() const { + std::list result; + std::transform(m_msgrs.begin(), m_msgrs.end(), std::back_inserter(result), + [](const auto& pair) { return pair.first; }); + return result; +} + /******************* * AsyncMessenger */ @@ -307,6 +389,29 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, processor_num = stack->get_num_worker(); for (unsigned i = 0; i < processor_num; ++i) processors.push_back(new Processor(this, stack->get_worker(i), cct)); + + cct->modify_msgr_hook( + [&]() { + auto hook = new AsyncMessengerSocketHook(*this, mname); + const int asok_ret = cct->get_admin_socket()->register_command( + AsyncMessengerSocketHook::COMMAND, hook, + "dump messenger status"); + if (asok_ret != 0) { + ldout(cct, 0) << __func__ << " messenger asok command \"" + << AsyncMessengerSocketHook::COMMAND + << "\" failed with" << asok_ret << dendl; + } + return hook; + }, + [&](AdminSocketHook* ptr) { + if (auto hook = dynamic_cast(ptr)) { + hook->add_messenger(mname, *this); + } else { + ceph_abort( + "BUG: messenger hook obj set, but not of type " + "AsyncMessengerSocketHook"); + } + }); } /** @@ -315,6 +420,17 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, */ AsyncMessenger::~AsyncMessenger() { + cct->modify_msgr_hook( + []() -> AdminSocketHook* { return nullptr; }, + [&](AdminSocketHook* ptr) { + if (auto hook = dynamic_cast(ptr)) { + hook->remove_messenger(*this); + } else { + ceph_abort( + "BUG: messenger hook obj set, but not of type " + "AsyncMessengerSocketHook"); + } + }); delete reap_handler; ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor for (auto &&p : processors) diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index cbc580b8d9b7..8304ac406ff5 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -36,6 +36,7 @@ #include "Event.h" #include "include/ceph_assert.h" +#include "common/admin_socket.h" class AsyncMessenger; @@ -65,6 +66,26 @@ class Processor { friend class AsyncMessenger; }; +class AsyncMessengerSocketHook : public AdminSocketHook { + std::map m_msgrs; + + public: + static constexpr std::string_view COMMAND = + "messenger dump " + "name=msgr,type=CephString,req=false " + "name=dumpcontents,type=CephChoices," + "strings=all|listen_sockets|connections|anon_conns|accepting_conns|deleted_conns," + "n=N,req=false " + "name=tcp_info,type=CephBool,req=false"; + AsyncMessengerSocketHook(AsyncMessenger& m, const std::string& name); + int call( + std::string_view command, const cmdmap_t& cmdmap, const bufferlist&, + Formatter* f, std::ostream& errss, ceph::buffer::list& out) override; + void add_messenger(const std::string& name, AsyncMessenger& msgr); + void remove_messenger(AsyncMessenger& msgr); + std::list messengers() const; +}; + /* * AsyncMessenger is represented for maintaining a set of asynchronous connections, * it may own a bind address and the accepted connections will be managed by diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 07f812589d11..b3a3de4843ab 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,8 @@ #include #include #include +#include +#include "common/Formatter.h" #include #define MSG_POLICY_UNIT_TESTING @@ -40,6 +43,7 @@ #include "msg/Message.h" #include "msg/Messenger.h" #include "msg/msg_types.h" +#include "msg/async/AsyncMessenger.h" typedef boost::mt11213b gen_type; @@ -897,6 +901,122 @@ TEST_P(MessengerTest, ReconnectRaceTest) { delete srv_interceptor; } +TEST_P(MessengerTest, DumpBasics) { + // rudimentary check dump results. see integration test in + // qa/workunits/cephtool/test.sh for detailed tests + if (std::strstr(GetParam(), "async") == nullptr) { + GTEST_SKIP() << "skipping as only async messengers have a messenger dump hook"; + } + + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + auto f = Formatter::create_unique("json"); + std::ostringstream os; + + server_msgr->get_myaddrs().dump(f.get()); + f->flush(os); + const auto server_addr = os.str(); + + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + f->reset(); + os.clear(); + server_msgr->dump(f.get()); + f->flush(os); + const auto server_dump = os.str(); + f->reset(); + os.clear(); + client_msgr->dump(f.get()); + f->flush(os); + const auto client_dump = os.str(); + + ASSERT_THAT(server_dump, ::testing::HasSubstr(server_addr)) << server_dump; + ASSERT_THAT(client_dump, ::testing::HasSubstr(server_addr)) << client_dump; + + server_msgr->shutdown(); + server_msgr->wait(); + client_msgr->shutdown(); + client_msgr->wait(); +} + +TEST(MessengerTest, AdminSocketHookLifecycle) { + DummyAuthClientServer dummy_auth(g_ceph_context); + Messenger* server_msgr = Messenger::create( + g_ceph_context, "async+unix", entity_name_t::OSD(0), "server", getpid()); + Messenger* client_msgr = Messenger::create( + g_ceph_context, "async+unix", entity_name_t::CLIENT(-1), "client", + getpid()); + server_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + client_msgr->set_default_policy(Messenger::Policy::lossy_client(0)); + server_msgr->set_auth_client(&dummy_auth); + server_msgr->set_auth_server(&dummy_auth); + client_msgr->set_auth_client(&dummy_auth); + client_msgr->set_auth_server(&dummy_auth); + server_msgr->set_require_authorizer(false); + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + bool create_called = false; + bool add_called = false; + g_ceph_context->modify_msgr_hook( + [&]() -> AdminSocketHook* { + create_called = true; + return nullptr; + }, + [&](AdminSocketHook* ptr) { + ASSERT_FALSE(create_called); + add_called = true; + if (auto hook = dynamic_cast(ptr)) { + auto msgrs = hook->messengers(); + ASSERT_EQ(2, msgrs.size()); + ASSERT_THAT( + msgrs, ::testing::UnorderedElementsAre("server", "client")); + } else { + FAIL() << "invalid type"; + } + }); + ASSERT_TRUE(add_called); + + client_msgr->shutdown(); + client_msgr->wait(); + delete client_msgr; + + g_ceph_context->modify_msgr_hook( + [&]() -> AdminSocketHook* { + create_called = true; + return nullptr; + }, + [&](AdminSocketHook* ptr) { + ASSERT_FALSE(create_called); + add_called = true; + if (auto hook = dynamic_cast(ptr)) { + auto msgrs = hook->messengers(); + ASSERT_EQ(1, msgrs.size()); + ASSERT_THAT(msgrs, ::testing::ElementsAre("server")); + } else { + FAIL() << "invalid type"; + } + }); + + server_msgr->shutdown(); + server_msgr->wait(); + + delete server_msgr; +} + TEST_P(MessengerTest, SimpleTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr;