#ifdef CEPH_DEBUG_MUTEX
_lockdep_obs(NULL),
#endif
+ _msgr_hook(nullptr),
crush_location(this)
{
if (options.create_log) {
lookup_or_create_singleton_object<MempoolObs>("mempool_obs", false, this);
}
+void CephContext::modify_msgr_hook(
+ std::function<AdminSocketHook*(void)> create,
+ std::function<void(AdminSocketHook*)> add) {
+ std::lock_guard l{_msgr_hook_lock};
+ if (_msgr_hook) {
+ add(_msgr_hook.get());
+ } else {
+ _msgr_hook.reset(create());
+ }
+}
+
CephContext::~CephContext()
{
associated_objs.clear();
delete _plugin_registry;
+ if (_msgr_hook) {
+ _admin_socket->unregister_commands(_msgr_hook.get());
+ }
_admin_socket->unregister_commands(_admin_hook);
delete _admin_hook;
delete _admin_socket;
#include "crush/CrushLocation.h"
class AdminSocket;
+class AdminSocketHook;
class CryptoHandler;
class CryptoRandom;
class MonMap;
#ifdef CEPH_DEBUG_MUTEX
md_config_obs_t *_lockdep_obs;
#endif
+
+ std::unique_ptr<AdminSocketHook> _msgr_hook;
+ ceph::mutex _msgr_hook_lock = ceph::make_mutex("CephContext::msgr_hook");
public:
TOPNSPC::crush::CrushLocation crush_location;
+ void modify_msgr_hook(std::function<AdminSocketHook*(void)> create,
+ std::function<void(AdminSocketHook*)> add);
private:
enum {
#include "acconfig.h"
+#include <algorithm>
#include <iostream>
#include <fstream>
+#include <iterator>
#include "AsyncMessenger.h"
#include <utime.h>
+#include <errno.h>
#include "common/Clock.h"
#include "common/Formatter.h"
+#include "common/cmdparse.h"
#include "common/config.h"
#include "common/Timer.h"
#include "common/errno.h"
}
};
+/*******************
+ * Admin Socket Hook
+ */
+
+AsyncMessengerSocketHook::AsyncMessengerSocketHook(
+ AsyncMessenger& m, const std::string& name)
+ : m_msgrs{{name, &m}} {}
+
+int AsyncMessengerSocketHook::call(
+ std::string_view command, const cmdmap_t& cmdmap, const bufferlist&,
+ Formatter* f, std::ostream& errss, ceph::buffer::list& out) {
+ if (command == "messenger dump") {
+ std::string name;
+ if (common::cmd_getval(cmdmap, "msgr", name)) {
+ if (auto it = m_msgrs.find(name); it != m_msgrs.end()) {
+ std::vector<std::string> opts;
+ const bool tcp_info =
+ common::cmd_getval_or<bool>(cmdmap, "tcp_info", false);
+ const bool has_filter =
+ common::cmd_getval(cmdmap, "dumpcontents", opts);
+ const std::set<std::string> optset(opts.begin(), opts.end());
+ const bool all = !has_filter || optset.contains("all");
+ const auto filter_fn = [&](const std::string& key) {
+ if (key == "tcp_info") {
+ return tcp_info;
+ } else if (all) {
+ return true;
+ } else {
+ return optset.contains(key);
+ }
+ };
+
+ const auto msgr = (*it).second;
+ f->open_object_section("status");
+ f->dump_string("name", name);
+ f->open_object_section("messenger");
+ msgr->dump(f, filter_fn);
+ f->close_section(); // messenger
+ f->close_section(); // status
+ return 0;
+ } else {
+ return -ENOENT;
+ }
+ } else {
+ f->open_object_section("status");
+ f->open_array_section("messengers");
+ for (const auto& [name, _] : m_msgrs) {
+ f->dump_string("name", name);
+ }
+ f->close_section();
+ f->close_section();
+ return 0;
+ }
+ }
+ return -ENOSYS;
+}
+
+void AsyncMessengerSocketHook::add_messenger(
+ const std::string& name, AsyncMessenger& msgr) {
+ m_msgrs.try_emplace(name, &msgr);
+}
+
+void AsyncMessengerSocketHook::remove_messenger(
+ AsyncMessenger& msgr) {
+ for (auto it = m_msgrs.begin(); it != m_msgrs.end(); ++it) {
+ if (&msgr == it->second) {
+ m_msgrs.erase(it);
+ break;
+ }
+ }
+}
+std::list<std::string> AsyncMessengerSocketHook::messengers() const {
+ std::list<std::string> result;
+ std::transform(m_msgrs.begin(), m_msgrs.end(), std::back_inserter(result),
+ [](const auto& pair) { return pair.first; });
+ return result;
+}
+
/*******************
* AsyncMessenger
*/
processor_num = stack->get_num_worker();
for (unsigned i = 0; i < processor_num; ++i)
processors.push_back(new Processor(this, stack->get_worker(i), cct));
+
+ cct->modify_msgr_hook(
+ [&]() {
+ auto hook = new AsyncMessengerSocketHook(*this, mname);
+ const int asok_ret = cct->get_admin_socket()->register_command(
+ AsyncMessengerSocketHook::COMMAND, hook,
+ "dump messenger status");
+ if (asok_ret != 0) {
+ ldout(cct, 0) << __func__ << " messenger asok command \""
+ << AsyncMessengerSocketHook::COMMAND
+ << "\" failed with" << asok_ret << dendl;
+ }
+ return hook;
+ },
+ [&](AdminSocketHook* ptr) {
+ if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+ hook->add_messenger(mname, *this);
+ } else {
+ ceph_abort(
+ "BUG: messenger hook obj set, but not of type "
+ "AsyncMessengerSocketHook");
+ }
+ });
}
/**
*/
AsyncMessenger::~AsyncMessenger()
{
+ cct->modify_msgr_hook(
+ []() -> AdminSocketHook* { return nullptr; },
+ [&](AdminSocketHook* ptr) {
+ if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+ hook->remove_messenger(*this);
+ } else {
+ ceph_abort(
+ "BUG: messenger hook obj set, but not of type "
+ "AsyncMessengerSocketHook");
+ }
+ });
delete reap_handler;
ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
for (auto &&p : processors)
#include "Event.h"
#include "include/ceph_assert.h"
+#include "common/admin_socket.h"
class AsyncMessenger;
friend class AsyncMessenger;
};
+class AsyncMessengerSocketHook : public AdminSocketHook {
+ std::map<std::string, AsyncMessenger*> m_msgrs;
+
+ public:
+ static constexpr std::string_view COMMAND =
+ "messenger dump "
+ "name=msgr,type=CephString,req=false "
+ "name=dumpcontents,type=CephChoices,"
+ "strings=all|listen_sockets|connections|anon_conns|accepting_conns|deleted_conns,"
+ "n=N,req=false "
+ "name=tcp_info,type=CephBool,req=false";
+ AsyncMessengerSocketHook(AsyncMessenger& m, const std::string& name);
+ int call(
+ std::string_view command, const cmdmap_t& cmdmap, const bufferlist&,
+ Formatter* f, std::ostream& errss, ceph::buffer::list& out) override;
+ void add_messenger(const std::string& name, AsyncMessenger& msgr);
+ void remove_messenger(AsyncMessenger& msgr);
+ std::list<std::string> messengers() const;
+};
+
/*
* AsyncMessenger is represented for maintaining a set of asynchronous connections,
* it may own a bind address and the accepted connections will be managed by
#include <list>
#include <memory>
#include <set>
+#include <gmock/gmock-matchers.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <boost/random/binomial_distribution.hpp>
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int.hpp>
+#include <sstream>
+#include "common/Formatter.h"
#include <gtest/gtest.h>
#define MSG_POLICY_UNIT_TESTING
#include "msg/Message.h"
#include "msg/Messenger.h"
#include "msg/msg_types.h"
+#include "msg/async/AsyncMessenger.h"
typedef boost::mt11213b gen_type;
delete srv_interceptor;
}
+TEST_P(MessengerTest, DumpBasics) {
+ // rudimentary check dump results. see integration test in
+ // qa/workunits/cephtool/test.sh for detailed tests
+ if (std::strstr(GetParam(), "async") == nullptr) {
+ GTEST_SKIP() << "skipping as only async messengers have a messenger dump hook";
+ }
+
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ auto f = Formatter::create_unique("json");
+ std::ostringstream os;
+
+ server_msgr->get_myaddrs().dump(f.get());
+ f->flush(os);
+ const auto server_addr = os.str();
+
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ f->reset();
+ os.clear();
+ server_msgr->dump(f.get());
+ f->flush(os);
+ const auto server_dump = os.str();
+ f->reset();
+ os.clear();
+ client_msgr->dump(f.get());
+ f->flush(os);
+ const auto client_dump = os.str();
+
+ ASSERT_THAT(server_dump, ::testing::HasSubstr(server_addr)) << server_dump;
+ ASSERT_THAT(client_dump, ::testing::HasSubstr(server_addr)) << client_dump;
+
+ server_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->shutdown();
+ client_msgr->wait();
+}
+
+TEST(MessengerTest, AdminSocketHookLifecycle) {
+ DummyAuthClientServer dummy_auth(g_ceph_context);
+ Messenger* server_msgr = Messenger::create(
+ g_ceph_context, "async+unix", entity_name_t::OSD(0), "server", getpid());
+ Messenger* client_msgr = Messenger::create(
+ g_ceph_context, "async+unix", entity_name_t::CLIENT(-1), "client",
+ getpid());
+ server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+ server_msgr->set_auth_client(&dummy_auth);
+ server_msgr->set_auth_server(&dummy_auth);
+ client_msgr->set_auth_client(&dummy_auth);
+ client_msgr->set_auth_server(&dummy_auth);
+ server_msgr->set_require_authorizer(false);
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ bool create_called = false;
+ bool add_called = false;
+ g_ceph_context->modify_msgr_hook(
+ [&]() -> AdminSocketHook* {
+ create_called = true;
+ return nullptr;
+ },
+ [&](AdminSocketHook* ptr) {
+ ASSERT_FALSE(create_called);
+ add_called = true;
+ if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+ auto msgrs = hook->messengers();
+ ASSERT_EQ(2, msgrs.size());
+ ASSERT_THAT(
+ msgrs, ::testing::UnorderedElementsAre("server", "client"));
+ } else {
+ FAIL() << "invalid type";
+ }
+ });
+ ASSERT_TRUE(add_called);
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ delete client_msgr;
+
+ g_ceph_context->modify_msgr_hook(
+ [&]() -> AdminSocketHook* {
+ create_called = true;
+ return nullptr;
+ },
+ [&](AdminSocketHook* ptr) {
+ ASSERT_FALSE(create_called);
+ add_called = true;
+ if (auto hook = dynamic_cast<AsyncMessengerSocketHook*>(ptr)) {
+ auto msgrs = hook->messengers();
+ ASSERT_EQ(1, msgrs.size());
+ ASSERT_THAT(msgrs, ::testing::ElementsAre("server"));
+ } else {
+ FAIL() << "invalid type";
+ }
+ });
+
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete server_msgr;
+}
+
TEST_P(MessengerTest, SimpleTest) {
FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
entity_addr_t bind_addr;