From c30fb84d9c6e3469cd55fca096ed9e0e1804ef7f Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 13 Feb 2023 11:28:19 +0800 Subject: [PATCH] test/crimson: cleanup messenger test addresses Consolidate the name/port/nonce of the addresses used in the messenger unit tests. Signed-off-by: Yingxin Cheng --- src/test/crimson/test_messenger.cc | 107 ++++++++++-------- .../crimson/{test_cmds.h => test_messenger.h} | 19 ++++ src/test/crimson/test_messenger_peer.cc | 54 ++++++--- 3 files changed, 116 insertions(+), 64 deletions(-) rename src/test/crimson/{test_cmds.h => test_messenger.h} (75%) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index a105401de43..dd43496c50e 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -27,7 +27,7 @@ #include #include -#include "test_cmds.h" +#include "test_messenger.h" using namespace std::chrono_literals; namespace bpo = boost::program_options; @@ -497,8 +497,7 @@ using crimson::net::Messenger; using crimson::net::MessengerRef; using crimson::net::SocketPolicy; using crimson::net::tag_bp_t; -using ceph::net::test::cmd_t; -using ceph::net::test::policy_t; +using namespace ceph::net::test; struct counter_t { unsigned counter = 0; }; @@ -929,16 +928,16 @@ class FailoverSuite : public Dispatcher { } private: - seastar::future<> init(entity_addr_t addr, SocketPolicy policy) { + seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) { test_msgr->set_default_policy(policy); test_msgr->set_auth_client(&dummy_auth); test_msgr->set_auth_server(&dummy_auth); test_msgr->set_interceptor(&interceptor); - return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] { + return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] { return test_msgr->start({this}); - }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) { + }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) { logger().error("FailoverSuite: " - "there is another instance running at {}", addr); + "there is another instance running at {}", test_addr); ceph_abort(); })); } @@ -1106,7 +1105,7 @@ class FailoverSuite : public Dispatcher { entity_addr_t test_peer_addr, const TestInterceptor& interceptor) { auto suite = std::make_unique( - Messenger::create(entity_name_t::OSD(2), "Test", 2), + Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE), test_peer_addr, interceptor); return suite->init(test_addr, test_policy ).then([suite = std::move(suite)] () mutable { @@ -1316,14 +1315,11 @@ class FailoverTest : public Dispatcher { } static seastar::future> - create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) { - test_addr.set_nonce(2); - cmd_peer_addr.set_nonce(3); - entity_addr_t test_peer_addr = cmd_peer_addr; - test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); - test_peer_addr.set_nonce(4); + create(entity_addr_t test_addr, + entity_addr_t cmd_peer_addr, + entity_addr_t test_peer_addr) { auto test = seastar::make_lw_shared( - Messenger::create(entity_name_t::OSD(1), "CmdCli", 1), + Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE), test_addr, test_peer_addr); return test->init(cmd_peer_addr).then([test] { logger().info("CmdCli ready"); @@ -1446,15 +1442,15 @@ class FailoverSuitePeer : public Dispatcher { } private: - seastar::future<> init(entity_addr_t addr, SocketPolicy policy) { + seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) { peer_msgr->set_default_policy(policy); peer_msgr->set_auth_client(&dummy_auth); peer_msgr->set_auth_server(&dummy_auth); - return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] { + return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] { return peer_msgr->start({this}); - }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) { + }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) { logger().error("FailoverSuitePeer: " - "there is another instance running at {}", addr); + "there is another instance running at {}", test_peer_addr); ceph_abort(); })); } @@ -1491,9 +1487,9 @@ class FailoverSuitePeer : public Dispatcher { return peer_msgr->shutdown(); } - seastar::future<> connect_peer(entity_addr_t addr) { - logger().info("[TestPeer] connect_peer({})", addr); - auto new_tracked_conn = peer_msgr->connect(addr, entity_name_t::TYPE_OSD); + seastar::future<> connect_peer(entity_addr_t test_addr_decoded) { + logger().info("[TestPeer] connect_peer({})", test_addr_decoded); + auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD); if (tracked_conn) { if (tracked_conn->is_closed()) { ceph_assert(tracked_conn != new_tracked_conn); @@ -1535,10 +1531,15 @@ class FailoverSuitePeer : public Dispatcher { } static seastar::future> - create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) { + create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) { auto suite = std::make_unique( - Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback); - return suite->init(addr, policy + Messenger::create( + entity_name_t::OSD(TEST_PEER_OSD), + "TestPeer", + TEST_PEER_NONCE), + op_callback + ); + return suite->init(test_peer_addr, policy ).then([suite = std::move(suite)] () mutable { return std::move(suite); }); @@ -1597,8 +1598,8 @@ class FailoverTestPeer : public Dispatcher { case cmd_t::suite_start: { ceph_assert(!test_suite); auto policy = to_socket_policy(static_cast(m_cmd->cmd[1][0])); - return FailoverSuitePeer::create(test_peer_addr, policy, - [this] { return notify_recv_op(); } + return FailoverSuitePeer::create( + test_peer_addr, policy, [this] { return notify_recv_op(); } ).then([this] (auto suite) { test_suite.swap(suite); }); @@ -1610,9 +1611,9 @@ class FailoverTestPeer : public Dispatcher { }); case cmd_t::suite_connect_me: { ceph_assert(test_suite); - entity_addr_t test_addr = entity_addr_t(); - test_addr.parse(m_cmd->cmd[1].c_str(), nullptr); - return test_suite->connect_peer(test_addr); + entity_addr_t test_addr_decoded = entity_addr_t(); + test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr); + return test_suite->connect_peer(test_addr_decoded); } case cmd_t::suite_send_me: ceph_assert(test_suite); @@ -1655,12 +1656,10 @@ class FailoverTestPeer : public Dispatcher { } static seastar::future> - create(entity_addr_t cmd_peer_addr) { - // suite bind to cmd_peer_addr, with port + 1 - entity_addr_t test_peer_addr = cmd_peer_addr; - test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); + create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) { auto test_peer = std::make_unique( - Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr); + Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE), + test_peer_addr); return test_peer->init(cmd_peer_addr ).then([test_peer = std::move(test_peer)] () mutable { logger().info("CmdSrv ready"); @@ -3464,17 +3463,19 @@ test_v2_lossless_peer_acceptor(FailoverTest& test) { seastar::future<> test_v2_protocol(entity_addr_t test_addr, + entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr, bool test_peer_islocal) { ceph_assert_always(test_addr.is_msgr2()); + ceph_assert_always(cmd_peer_addr.is_msgr2()); ceph_assert_always(test_peer_addr.is_msgr2()); if (test_peer_islocal) { // initiate crimson test peer locally - logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr); - return FailoverTestPeer::create(test_peer_addr - ).then([test_addr, test_peer_addr] (auto peer) { - return test_v2_protocol(test_addr, test_peer_addr, false + logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr); + return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr + ).then([test_addr, cmd_peer_addr, test_peer_addr](auto peer) { + return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, false ).then([peer = std::move(peer)] () mutable { return peer->wait().then([peer = std::move(peer)] {}); }); @@ -3484,7 +3485,8 @@ test_v2_protocol(entity_addr_t test_addr, }); } - return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) { + return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr + ).then([](auto test) { return seastar::futurize_invoke([test] { return test_v2_lossy_early_connect_fault(*test); }).then([test] { @@ -3599,20 +3601,33 @@ seastar::future do_test(seastar::app_template& app) verbose = config["verbose"].as(); auto rounds = config["rounds"].as(); auto keepalive_ratio = config["keepalive-ratio"].as(); + auto testpeer_islocal = config["testpeer-islocal"].as(); + entity_addr_t test_addr; ceph_assert(test_addr.parse( config["test-addr"].as().c_str(), nullptr)); - entity_addr_t testpeer_addr; - ceph_assert(testpeer_addr.parse( + test_addr.set_nonce(TEST_NONCE); + + entity_addr_t cmd_peer_addr; + ceph_assert(cmd_peer_addr.parse( config["testpeer-addr"].as().c_str(), nullptr)); - auto testpeer_islocal = config["testpeer-islocal"].as(); + cmd_peer_addr.set_nonce(CMD_SRV_NONCE); + + entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr); + + logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, " + "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, " + "testpeer_islocal={}", + verbose, rounds, keepalive_ratio, + test_addr, cmd_peer_addr, test_peer_addr, + testpeer_islocal); return test_echo(rounds, keepalive_ratio) .then([] { return test_concurrent_dispatch(); }).then([] { return test_preemptive_shutdown(); - }).then([test_addr, testpeer_addr, testpeer_islocal] { - return test_v2_protocol(test_addr, testpeer_addr, testpeer_islocal); + }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal] { + return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal); }).then([] { logger().info("All tests succeeded"); // Seastar has bugs to have events undispatched during shutdown, @@ -3643,7 +3658,7 @@ int main(int argc, char** argv) "address of v2 failover tests") ("testpeer-addr", bpo::value()->default_value("v2:127.0.0.1:9013"), "addresses of v2 failover testpeer" - " (CmdSrv address and TestPeer address with port+=1)") + " (This is CmdSrv address, and TestPeer address is at port+=1)") ("testpeer-islocal", bpo::value()->default_value(true), "create a local crimson testpeer, or connect to a remote testpeer"); return app.run(argc, argv, [&app] { diff --git a/src/test/crimson/test_cmds.h b/src/test/crimson/test_messenger.h similarity index 75% rename from src/test/crimson/test_cmds.h rename to src/test/crimson/test_messenger.h index 2320d30a05d..75d060f6f39 100644 --- a/src/test/crimson/test_cmds.h +++ b/src/test/crimson/test_messenger.h @@ -3,8 +3,27 @@ #pragma once +#include "msg/msg_types.h" + namespace ceph::net::test { +constexpr uint64_t CMD_CLI_NONCE = 1; +constexpr int64_t CMD_CLI_OSD = 1; +constexpr uint64_t TEST_NONCE = 2; +constexpr int64_t TEST_OSD = 2; +constexpr uint64_t CMD_SRV_NONCE = 3; +constexpr int64_t CMD_SRV_OSD = 3; +constexpr uint64_t TEST_PEER_NONCE = 4; +constexpr int64_t TEST_PEER_OSD = 4; + +inline entity_addr_t get_test_peer_addr( + const entity_addr_t &cmd_peer_addr) { + entity_addr_t test_peer_addr = cmd_peer_addr; + test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); + test_peer_addr.set_nonce(TEST_PEER_NONCE); + return test_peer_addr; +} + enum class cmd_t : char { none = '\0', shutdown, diff --git a/src/test/crimson/test_messenger_peer.cc b/src/test/crimson/test_messenger_peer.cc index 0232262fc59..34d7a7413fc 100644 --- a/src/test/crimson/test_messenger_peer.cc +++ b/src/test/crimson/test_messenger_peer.cc @@ -14,14 +14,13 @@ #include "msg/Dispatcher.h" #include "msg/Messenger.h" -#include "test_cmds.h" +#include "test_messenger.h" namespace { #define dout_subsys ceph_subsys_test -using ceph::net::test::cmd_t; -using ceph::net::test::policy_t; +using namespace ceph::net::test; using SocketPolicy = Messenger::Policy; constexpr int CEPH_OSD_PROTOCOL = 10; @@ -105,7 +104,11 @@ class FailoverSuitePeer : public Dispatcher { private: void init(entity_addr_t test_peer_addr, SocketPolicy policy) { - peer_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(4), "TestPeer", 4)); + peer_msgr.reset(Messenger::create( + cct, "async", + entity_name_t::OSD(TEST_PEER_OSD), + "TestPeer", + TEST_PEER_NONCE)); dummy_auth.auth_registry.refresh_config(); peer_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); peer_msgr->set_default_policy(policy); @@ -361,7 +364,11 @@ class FailoverTestPeer : public Dispatcher { } void init(entity_addr_t cmd_peer_addr) { - cmd_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(3), "CmdSrv", 3)); + cmd_msgr.reset(Messenger::create( + cct, "async", + entity_name_t::OSD(CMD_SRV_OSD), + "CmdSrv", + CMD_SRV_NONCE)); dummy_auth.auth_registry.refresh_config(); cmd_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); cmd_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); @@ -384,11 +391,12 @@ class FailoverTestPeer : public Dispatcher { void wait() { cmd_msgr->wait(); } static std::unique_ptr - create(CephContext* cct, entity_addr_t cmd_peer_addr, bool nonstop) { - // suite bind to cmd_peer_addr, with port + 1 - entity_addr_t test_peer_addr = cmd_peer_addr; - test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); - auto test_peer = std::make_unique(cct, test_peer_addr, nonstop); + create(CephContext* cct, + entity_addr_t cmd_peer_addr, + entity_addr_t test_peer_addr, + bool nonstop) { + auto test_peer = std::make_unique( + cct, test_peer_addr, nonstop); test_peer->init(cmd_peer_addr); ldout(cct, 0) << "[CmdSrv] ready" << dendl; return test_peer; @@ -404,7 +412,7 @@ int main(int argc, char** argv) desc.add_options() ("help,h", "show help message") ("addr", po::value()->default_value("v2:127.0.0.1:9013"), - "CmdSrv address, and TestPeer address with port+=1") + "This is CmdSrv address, and TestPeer address is at port+=1") ("nonstop", po::value()->default_value(false), "Do not shutdown TestPeer when all tests are successful"); po::variables_map vm; @@ -426,12 +434,6 @@ int main(int argc, char** argv) return 1; } - auto addr = vm["addr"].as(); - entity_addr_t cmd_peer_addr; - cmd_peer_addr.parse(addr.c_str(), nullptr); - ceph_assert_always(cmd_peer_addr.is_msgr2()); - auto nonstop = vm["nonstop"].as(); - std::vector args(argv, argv + argc); auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, @@ -439,6 +441,22 @@ int main(int argc, char** argv) CINIT_FLAG_NO_MON_CONFIG); common_init_finish(cct.get()); - auto test_peer = FailoverTestPeer::create(cct.get(), cmd_peer_addr, nonstop); + auto addr = vm["addr"].as(); + entity_addr_t cmd_peer_addr; + cmd_peer_addr.parse(addr.c_str(), nullptr); + cmd_peer_addr.set_nonce(CMD_SRV_NONCE); + ceph_assert_always(cmd_peer_addr.is_msgr2()); + auto test_peer_addr = get_test_peer_addr(cmd_peer_addr); + auto nonstop = vm["nonstop"].as(); + ldout(cct, 0) << "test configuration: cmd_peer_addr=" << cmd_peer_addr + << ", test_peer_addr=" << test_peer_addr + << ", nonstop=" << nonstop + << dendl; + + auto test_peer = FailoverTestPeer::create( + cct.get(), + cmd_peer_addr, + test_peer_addr, + nonstop); test_peer->wait(); } -- 2.39.5