]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson: cleanup messenger test addresses
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 13 Feb 2023 03:28:19 +0000 (11:28 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 13 Feb 2023 03:28:19 +0000 (11:28 +0800)
Consolidate the name/port/nonce of the addresses used in the messenger
unit tests.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_cmds.h [deleted file]
src/test/crimson/test_messenger.cc
src/test/crimson/test_messenger.h [new file with mode: 0644]
src/test/crimson/test_messenger_peer.cc

diff --git a/src/test/crimson/test_cmds.h b/src/test/crimson/test_cmds.h
deleted file mode 100644 (file)
index 2320d30..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-namespace ceph::net::test {
-
-enum class cmd_t : char {
-  none = '\0',
-  shutdown,
-  suite_start,
-  suite_stop,
-  suite_connect_me,
-  suite_send_me,
-  suite_keepalive_me,
-  suite_markdown,
-  suite_recv_op
-};
-
-enum class policy_t : char {
-  none = '\0',
-  stateful_server,
-  stateless_server,
-  lossless_peer,
-  lossless_peer_reuse,
-  lossy_client,
-  lossless_client
-};
-
-inline std::ostream& operator<<(std::ostream& out, const cmd_t& cmd) {
-  switch(cmd) {
-   case cmd_t::none:
-    return out << "none";
-   case cmd_t::shutdown:
-    return out << "shutdown";
-   case cmd_t::suite_start:
-    return out << "suite_start";
-   case cmd_t::suite_stop:
-    return out << "suite_stop";
-   case cmd_t::suite_connect_me:
-    return out << "suite_connect_me";
-   case cmd_t::suite_send_me:
-    return out << "suite_send_me";
-   case cmd_t::suite_keepalive_me:
-    return out << "suite_keepalive_me";
-   case cmd_t::suite_markdown:
-    return out << "suite_markdown";
-   case cmd_t::suite_recv_op:
-    return out << "suite_recv_op";
-   default:
-    ceph_abort();
-  }
-}
-
-inline std::ostream& operator<<(std::ostream& out, const policy_t& policy) {
-  switch(policy) {
-   case policy_t::none:
-    return out << "none";
-   case policy_t::stateful_server:
-    return out << "stateful_server";
-   case policy_t::stateless_server:
-    return out << "stateless_server";
-   case policy_t::lossless_peer:
-    return out << "lossless_peer";
-   case policy_t::lossless_peer_reuse:
-    return out << "lossless_peer_reuse";
-   case policy_t::lossy_client:
-    return out << "lossy_client";
-   case policy_t::lossless_client:
-    return out << "lossless_client";
-   default:
-    ceph_abort();
-  }
-}
-
-} // namespace ceph::net::test
index a105401de431b8a86cedab2d0c9a5819d268574f..dd43496c50e6e6c3dc4fb45215f1a23d72c54aa7 100644 (file)
@@ -27,7 +27,7 @@
 #include <seastar/core/sleep.hh>
 #include <seastar/core/with_timeout.hh>
 
-#include "test_cmds.h"
+#include "test_messenger.h"
 
 using namespace std::chrono_literals;
 namespace bpo = boost::program_options;
@@ -497,8 +497,7 @@ using crimson::net::Messenger;
 using crimson::net::MessengerRef;
 using crimson::net::SocketPolicy;
 using crimson::net::tag_bp_t;
-using ceph::net::test::cmd_t;
-using ceph::net::test::policy_t;
+using namespace ceph::net::test;
 
 struct counter_t { unsigned counter = 0; };
 
@@ -929,16 +928,16 @@ class FailoverSuite : public Dispatcher {
   }
 
  private:
-  seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+  seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) {
     test_msgr->set_default_policy(policy);
     test_msgr->set_auth_client(&dummy_auth);
     test_msgr->set_auth_server(&dummy_auth);
     test_msgr->set_interceptor(&interceptor);
-    return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+    return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] {
       return test_msgr->start({this});
-    }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+    }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) {
       logger().error("FailoverSuite: "
-                     "there is another instance running at {}", addr);
+                     "there is another instance running at {}", test_addr);
       ceph_abort();
     }));
   }
@@ -1106,7 +1105,7 @@ class FailoverSuite : public Dispatcher {
          entity_addr_t test_peer_addr,
          const TestInterceptor& interceptor) {
     auto suite = std::make_unique<FailoverSuite>(
-        Messenger::create(entity_name_t::OSD(2), "Test", 2),
+        Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE),
         test_peer_addr, interceptor);
     return suite->init(test_addr, test_policy
     ).then([suite = std::move(suite)] () mutable {
@@ -1316,14 +1315,11 @@ class FailoverTest : public Dispatcher {
   }
 
   static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
-  create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
-    test_addr.set_nonce(2);
-    cmd_peer_addr.set_nonce(3);
-    entity_addr_t test_peer_addr = cmd_peer_addr;
-    test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
-    test_peer_addr.set_nonce(4);
+  create(entity_addr_t test_addr,
+         entity_addr_t cmd_peer_addr,
+         entity_addr_t test_peer_addr) {
     auto test = seastar::make_lw_shared<FailoverTest>(
-        Messenger::create(entity_name_t::OSD(1), "CmdCli", 1),
+        Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE),
         test_addr, test_peer_addr);
     return test->init(cmd_peer_addr).then([test] {
       logger().info("CmdCli ready");
@@ -1446,15 +1442,15 @@ class FailoverSuitePeer : public Dispatcher {
   }
 
  private:
-  seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+  seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) {
     peer_msgr->set_default_policy(policy);
     peer_msgr->set_auth_client(&dummy_auth);
     peer_msgr->set_auth_server(&dummy_auth);
-    return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+    return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] {
       return peer_msgr->start({this});
-    }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+    }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) {
       logger().error("FailoverSuitePeer: "
-                     "there is another instance running at {}", addr);
+                     "there is another instance running at {}", test_peer_addr);
       ceph_abort();
     }));
   }
@@ -1491,9 +1487,9 @@ class FailoverSuitePeer : public Dispatcher {
     return peer_msgr->shutdown();
   }
 
-  seastar::future<> connect_peer(entity_addr_t addr) {
-    logger().info("[TestPeer] connect_peer({})", addr);
-    auto new_tracked_conn = peer_msgr->connect(addr, entity_name_t::TYPE_OSD);
+  seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
+    logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
+    auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
     if (tracked_conn) {
       if (tracked_conn->is_closed()) {
         ceph_assert(tracked_conn != new_tracked_conn);
@@ -1535,10 +1531,15 @@ class FailoverSuitePeer : public Dispatcher {
   }
 
   static seastar::future<std::unique_ptr<FailoverSuitePeer>>
-  create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) {
+  create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) {
     auto suite = std::make_unique<FailoverSuitePeer>(
-        Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback);
-    return suite->init(addr, policy
+      Messenger::create(
+        entity_name_t::OSD(TEST_PEER_OSD),
+        "TestPeer",
+        TEST_PEER_NONCE),
+      op_callback
+    );
+    return suite->init(test_peer_addr, policy
     ).then([suite = std::move(suite)] () mutable {
       return std::move(suite);
     });
@@ -1597,8 +1598,8 @@ class FailoverTestPeer : public Dispatcher {
      case cmd_t::suite_start: {
       ceph_assert(!test_suite);
       auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
-      return FailoverSuitePeer::create(test_peer_addr, policy,
-                                       [this] { return notify_recv_op(); }
+      return FailoverSuitePeer::create(
+        test_peer_addr, policy, [this] { return notify_recv_op(); }
       ).then([this] (auto suite) {
         test_suite.swap(suite);
       });
@@ -1610,9 +1611,9 @@ class FailoverTestPeer : public Dispatcher {
       });
      case cmd_t::suite_connect_me: {
       ceph_assert(test_suite);
-      entity_addr_t test_addr = entity_addr_t();
-      test_addr.parse(m_cmd->cmd[1].c_str(), nullptr);
-      return test_suite->connect_peer(test_addr);
+      entity_addr_t test_addr_decoded = entity_addr_t();
+      test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr);
+      return test_suite->connect_peer(test_addr_decoded);
      }
      case cmd_t::suite_send_me:
       ceph_assert(test_suite);
@@ -1655,12 +1656,10 @@ class FailoverTestPeer : public Dispatcher {
   }
 
   static seastar::future<std::unique_ptr<FailoverTestPeer>>
-  create(entity_addr_t cmd_peer_addr) {
-    // suite bind to cmd_peer_addr, with port + 1
-    entity_addr_t test_peer_addr = cmd_peer_addr;
-    test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+  create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
     auto test_peer = std::make_unique<FailoverTestPeer>(
-        Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr);
+        Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE),
+        test_peer_addr);
     return test_peer->init(cmd_peer_addr
     ).then([test_peer = std::move(test_peer)] () mutable {
       logger().info("CmdSrv ready");
@@ -3464,17 +3463,19 @@ test_v2_lossless_peer_acceptor(FailoverTest& test) {
 
 seastar::future<>
 test_v2_protocol(entity_addr_t test_addr,
+                 entity_addr_t cmd_peer_addr,
                  entity_addr_t test_peer_addr,
                  bool test_peer_islocal) {
   ceph_assert_always(test_addr.is_msgr2());
+  ceph_assert_always(cmd_peer_addr.is_msgr2());
   ceph_assert_always(test_peer_addr.is_msgr2());
 
   if (test_peer_islocal) {
     // initiate crimson test peer locally
-    logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr);
-    return FailoverTestPeer::create(test_peer_addr
-    ).then([test_addr, test_peer_addr] (auto peer) {
-      return test_v2_protocol(test_addr, test_peer_addr, false
+    logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr);
+    return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr
+    ).then([test_addr, cmd_peer_addr, test_peer_addr](auto peer) {
+      return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, false
       ).then([peer = std::move(peer)] () mutable {
         return peer->wait().then([peer = std::move(peer)] {});
       });
@@ -3484,7 +3485,8 @@ test_v2_protocol(entity_addr_t test_addr,
     });
   }
 
-  return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) {
+  return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr
+  ).then([](auto test) {
     return seastar::futurize_invoke([test] {
       return test_v2_lossy_early_connect_fault(*test);
     }).then([test] {
@@ -3599,20 +3601,33 @@ seastar::future<int> do_test(seastar::app_template& app)
     verbose = config["verbose"].as<bool>();
     auto rounds = config["rounds"].as<unsigned>();
     auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+    auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
+
     entity_addr_t test_addr;
     ceph_assert(test_addr.parse(
         config["test-addr"].as<std::string>().c_str(), nullptr));
-    entity_addr_t testpeer_addr;
-    ceph_assert(testpeer_addr.parse(
+    test_addr.set_nonce(TEST_NONCE);
+
+    entity_addr_t cmd_peer_addr;
+    ceph_assert(cmd_peer_addr.parse(
         config["testpeer-addr"].as<std::string>().c_str(), nullptr));
-    auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
+    cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
+
+    entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+
+    logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
+                  "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
+                  "testpeer_islocal={}",
+                  verbose, rounds, keepalive_ratio,
+                  test_addr, cmd_peer_addr, test_peer_addr,
+                  testpeer_islocal);
     return test_echo(rounds, keepalive_ratio)
     .then([] {
       return test_concurrent_dispatch();
     }).then([] {
       return test_preemptive_shutdown();
-    }).then([test_addr, testpeer_addr, testpeer_islocal] {
-      return test_v2_protocol(test_addr, testpeer_addr, testpeer_islocal);
+    }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal] {
+      return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal);
     }).then([] {
       logger().info("All tests succeeded");
       // Seastar has bugs to have events undispatched during shutdown,
@@ -3643,7 +3658,7 @@ int main(int argc, char** argv)
      "address of v2 failover tests")
     ("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9013"),
      "addresses of v2 failover testpeer"
-     " (CmdSrv address and TestPeer address with port+=1)")
+     " (This is CmdSrv address, and TestPeer address is at port+=1)")
     ("testpeer-islocal", bpo::value<bool>()->default_value(true),
      "create a local crimson testpeer, or connect to a remote testpeer");
   return app.run(argc, argv, [&app] {
diff --git a/src/test/crimson/test_messenger.h b/src/test/crimson/test_messenger.h
new file mode 100644 (file)
index 0000000..75d060f
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/msg_types.h"
+
+namespace ceph::net::test {
+
+constexpr uint64_t CMD_CLI_NONCE = 1;
+constexpr int64_t CMD_CLI_OSD = 1;
+constexpr uint64_t TEST_NONCE = 2;
+constexpr int64_t TEST_OSD = 2;
+constexpr uint64_t CMD_SRV_NONCE = 3;
+constexpr int64_t CMD_SRV_OSD = 3;
+constexpr uint64_t TEST_PEER_NONCE = 4;
+constexpr int64_t TEST_PEER_OSD = 4;
+
+inline entity_addr_t get_test_peer_addr(
+    const entity_addr_t &cmd_peer_addr) {
+  entity_addr_t test_peer_addr = cmd_peer_addr;
+  test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+  test_peer_addr.set_nonce(TEST_PEER_NONCE);
+  return test_peer_addr;
+}
+
+enum class cmd_t : char {
+  none = '\0',
+  shutdown,
+  suite_start,
+  suite_stop,
+  suite_connect_me,
+  suite_send_me,
+  suite_keepalive_me,
+  suite_markdown,
+  suite_recv_op
+};
+
+enum class policy_t : char {
+  none = '\0',
+  stateful_server,
+  stateless_server,
+  lossless_peer,
+  lossless_peer_reuse,
+  lossy_client,
+  lossless_client
+};
+
+inline std::ostream& operator<<(std::ostream& out, const cmd_t& cmd) {
+  switch(cmd) {
+   case cmd_t::none:
+    return out << "none";
+   case cmd_t::shutdown:
+    return out << "shutdown";
+   case cmd_t::suite_start:
+    return out << "suite_start";
+   case cmd_t::suite_stop:
+    return out << "suite_stop";
+   case cmd_t::suite_connect_me:
+    return out << "suite_connect_me";
+   case cmd_t::suite_send_me:
+    return out << "suite_send_me";
+   case cmd_t::suite_keepalive_me:
+    return out << "suite_keepalive_me";
+   case cmd_t::suite_markdown:
+    return out << "suite_markdown";
+   case cmd_t::suite_recv_op:
+    return out << "suite_recv_op";
+   default:
+    ceph_abort();
+  }
+}
+
+inline std::ostream& operator<<(std::ostream& out, const policy_t& policy) {
+  switch(policy) {
+   case policy_t::none:
+    return out << "none";
+   case policy_t::stateful_server:
+    return out << "stateful_server";
+   case policy_t::stateless_server:
+    return out << "stateless_server";
+   case policy_t::lossless_peer:
+    return out << "lossless_peer";
+   case policy_t::lossless_peer_reuse:
+    return out << "lossless_peer_reuse";
+   case policy_t::lossy_client:
+    return out << "lossy_client";
+   case policy_t::lossless_client:
+    return out << "lossless_client";
+   default:
+    ceph_abort();
+  }
+}
+
+} // namespace ceph::net::test
index 0232262fc598d22d73a32e4b8849e6e280a951a7..34d7a7413fcac888fae93fd12d55f1e79b96da37 100644 (file)
 #include "msg/Dispatcher.h"
 #include "msg/Messenger.h"
 
-#include "test_cmds.h"
+#include "test_messenger.h"
 
 namespace {
 
 #define dout_subsys ceph_subsys_test
 
-using ceph::net::test::cmd_t;
-using ceph::net::test::policy_t;
+using namespace ceph::net::test;
 using SocketPolicy = Messenger::Policy;
 
 constexpr int CEPH_OSD_PROTOCOL = 10;
@@ -105,7 +104,11 @@ class FailoverSuitePeer : public Dispatcher {
 
  private:
   void init(entity_addr_t test_peer_addr, SocketPolicy policy) {
-    peer_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(4), "TestPeer", 4));
+    peer_msgr.reset(Messenger::create(
+      cct, "async",
+      entity_name_t::OSD(TEST_PEER_OSD),
+      "TestPeer",
+      TEST_PEER_NONCE));
     dummy_auth.auth_registry.refresh_config();
     peer_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
     peer_msgr->set_default_policy(policy);
@@ -361,7 +364,11 @@ class FailoverTestPeer : public Dispatcher {
   }
 
   void init(entity_addr_t cmd_peer_addr) {
-    cmd_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(3), "CmdSrv", 3));
+    cmd_msgr.reset(Messenger::create(
+      cct, "async",
+      entity_name_t::OSD(CMD_SRV_OSD),
+      "CmdSrv",
+      CMD_SRV_NONCE));
     dummy_auth.auth_registry.refresh_config();
     cmd_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
     cmd_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
@@ -384,11 +391,12 @@ class FailoverTestPeer : public Dispatcher {
   void wait() { cmd_msgr->wait(); }
 
   static std::unique_ptr<FailoverTestPeer>
-  create(CephContext* cct, entity_addr_t cmd_peer_addr, bool nonstop) {
-    // suite bind to cmd_peer_addr, with port + 1
-    entity_addr_t test_peer_addr = cmd_peer_addr;
-    test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
-    auto test_peer = std::make_unique<FailoverTestPeer>(cct, test_peer_addr, nonstop);
+  create(CephContext* cct,
+         entity_addr_t cmd_peer_addr,
+         entity_addr_t test_peer_addr,
+         bool nonstop) {
+    auto test_peer = std::make_unique<FailoverTestPeer>(
+        cct, test_peer_addr, nonstop);
     test_peer->init(cmd_peer_addr);
     ldout(cct, 0) << "[CmdSrv] ready" << dendl;
     return test_peer;
@@ -404,7 +412,7 @@ int main(int argc, char** argv)
   desc.add_options()
     ("help,h", "show help message")
     ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9013"),
-     "CmdSrv address, and TestPeer address with port+=1")
+     "This is CmdSrv address, and TestPeer address is at port+=1")
     ("nonstop", po::value<bool>()->default_value(false),
      "Do not shutdown TestPeer when all tests are successful");
   po::variables_map vm;
@@ -426,12 +434,6 @@ int main(int argc, char** argv)
     return 1;
   }
 
-  auto addr = vm["addr"].as<std::string>();
-  entity_addr_t cmd_peer_addr;
-  cmd_peer_addr.parse(addr.c_str(), nullptr);
-  ceph_assert_always(cmd_peer_addr.is_msgr2());
-  auto nonstop = vm["nonstop"].as<bool>();
-
   std::vector<const char*> args(argv, argv + argc);
   auto cct = global_init(nullptr, args,
                          CEPH_ENTITY_TYPE_CLIENT,
@@ -439,6 +441,22 @@ int main(int argc, char** argv)
                          CINIT_FLAG_NO_MON_CONFIG);
   common_init_finish(cct.get());
 
-  auto test_peer = FailoverTestPeer::create(cct.get(), cmd_peer_addr, nonstop);
+  auto addr = vm["addr"].as<std::string>();
+  entity_addr_t cmd_peer_addr;
+  cmd_peer_addr.parse(addr.c_str(), nullptr);
+  cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
+  ceph_assert_always(cmd_peer_addr.is_msgr2());
+  auto test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+  auto nonstop = vm["nonstop"].as<bool>();
+  ldout(cct, 0) << "test configuration: cmd_peer_addr=" << cmd_peer_addr
+                << ", test_peer_addr=" << test_peer_addr
+                << ", nonstop=" << nonstop
+                << dendl;
+
+  auto test_peer = FailoverTestPeer::create(
+      cct.get(),
+      cmd_peer_addr,
+      test_peer_addr,
+      nonstop);
   test_peer->wait();
 }