]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson: allow v2 failover tests to connect to remote TestPeer
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 23 Oct 2019 02:48:54 +0000 (10:48 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 28 Oct 2019 02:30:18 +0000 (10:30 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_messenger.cc

index d52868ce23813c8165df16762789ad8d2cd9efa6..0ee7f2876b0f5a21c967eabd196e7e066bb0f41c 100644 (file)
@@ -1144,6 +1144,10 @@ class FailoverSuite : public Dispatcher {
       test_peer_addr(test_peer_addr),
       interceptor(interceptor) { }
 
+  entity_addr_t get_addr() const {
+    return test_msgr.get_myaddr();
+  }
+
   seastar::future<> shutdown() {
     return test_msgr.shutdown();
   }
@@ -1375,6 +1379,7 @@ class FailoverTest : public Dispatcher {
     cmd_msgr.set_auth_client(&dummy_auth);
     cmd_msgr.set_auth_server(&dummy_auth);
     return cmd_msgr.start(this).then([this, cmd_peer_addr] {
+      logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
       return cmd_msgr.connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
     }).then([this] (auto conn) {
       cmd_conn = conn->release();
@@ -1404,9 +1409,10 @@ class FailoverTest : public Dispatcher {
 
   static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
   create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
-    assert(cmd_peer_addr.is_msgr2());
     return Messenger::create(entity_name_t::OSD(1), "CmdCli", 1, 0
-    ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) {
+    ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) mutable {
+      test_addr.set_nonce(2);
+      cmd_peer_addr.set_nonce(3);
       entity_addr_t test_peer_addr = cmd_peer_addr;
       test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
       test_peer_addr.set_nonce(4);
@@ -1433,6 +1439,7 @@ class FailoverTest : public Dispatcher {
     return FailoverSuite::create(
         test_addr, test_policy_, test_peer_addr, interceptor
     ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
+      ceph_assert(suite->get_addr() == test_addr);
       test_suite.swap(suite);
       return start_peer(peer_policy).then([this, f = std::move(f)] {
         return f(*test_suite);
@@ -1626,6 +1633,7 @@ class FailoverTestPeer : public Dispatcher {
   crimson::auth::DummyAuthClientServer dummy_auth;
   Messenger& cmd_msgr;
   ConnectionRef cmd_conn;
+  const entity_addr_t test_peer_addr;
   std::unique_ptr<FailoverSuitePeer> test_suite;
 
   seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
@@ -1669,9 +1677,6 @@ class FailoverTestPeer : public Dispatcher {
     switch (cmd) {
      case cmd_t::suite_start: {
       ceph_assert(!test_suite);
-      // suite bind to cmd_addr, with port + 1
-      auto test_peer_addr = get_addr();
-      test_peer_addr.set_port(get_addr().get_port() + 1);
       auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
       return FailoverSuitePeer::create(test_peer_addr, policy,
                                        [this] { return notify_recv_op(); }
@@ -1706,34 +1711,34 @@ class FailoverTestPeer : public Dispatcher {
     }
   }
 
-  seastar::future<> init(entity_addr_t cmd_addr) {
+  seastar::future<> init(entity_addr_t cmd_peer_addr) {
     cmd_msgr.set_default_policy(SocketPolicy::stateless_server(0));
     cmd_msgr.set_auth_client(&dummy_auth);
     cmd_msgr.set_auth_server(&dummy_auth);
-    return cmd_msgr.bind(entity_addrvec_t{cmd_addr}).then([this] {
+    return cmd_msgr.bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
       return cmd_msgr.start(this);
     });
   }
 
  public:
-  FailoverTestPeer(Messenger& cmd_msgr)
-    : cmd_msgr(cmd_msgr) { }
-
-  entity_addr_t get_addr() const {
-    return cmd_msgr.get_myaddr();
-  }
+  FailoverTestPeer(Messenger& cmd_msgr,
+                   entity_addr_t test_peer_addr)
+    : cmd_msgr(cmd_msgr),
+      test_peer_addr(test_peer_addr) { }
 
   seastar::future<> wait() {
     return cmd_msgr.wait();
   }
 
-  static seastar::future<std::unique_ptr<FailoverTestPeer>> create() {
+  static seastar::future<std::unique_ptr<FailoverTestPeer>>
+  create(entity_addr_t cmd_peer_addr) {
     return Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3, 0
-    ).then([] (Messenger* cmd_msgr) {
-      entity_addr_t cmd_addr;
-      cmd_addr.parse("v2:127.0.0.1:9011", nullptr);
-      auto test_peer = std::make_unique<FailoverTestPeer>(*cmd_msgr);
-      return test_peer->init(cmd_addr
+    ).then([cmd_peer_addr] (Messenger* cmd_msgr) {
+      // suite bind to cmd_peer_addr, with port + 1
+      entity_addr_t test_peer_addr = cmd_peer_addr;
+      test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+      auto test_peer = std::make_unique<FailoverTestPeer>(*cmd_msgr, test_peer_addr);
+      return test_peer->init(cmd_peer_addr
       ).then([test_peer = std::move(test_peer)] () mutable {
         logger().info("CmdSrv ready");
         return std::move(test_peer);
@@ -3544,15 +3549,18 @@ test_v2_lossless_peer_acceptor(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_protocol(entity_addr_t test_addr = entity_addr_t(),
-                 entity_addr_t cmd_peer_addr = entity_addr_t()) {
-  if (test_addr == entity_addr_t() || cmd_peer_addr == entity_addr_t()) {
+test_v2_protocol(entity_addr_t test_addr,
+                 entity_addr_t test_peer_addr,
+                 bool test_peer_islocal) {
+  ceph_assert(test_addr.is_msgr2());
+  ceph_assert(test_peer_addr.is_msgr2());
+
+  if (test_peer_islocal) {
     // initiate crimson test peer locally
-    logger().info("test_v2_protocol: start local TestPeer...");
-    return FailoverTestPeer::create().then([] (auto peer) {
-      entity_addr_t test_addr_;
-      test_addr_.parse("v2:127.0.0.1:9010");
-      return test_v2_protocol(test_addr_, peer->get_addr()
+    logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr);
+    return FailoverTestPeer::create(test_peer_addr
+    ).then([test_addr, test_peer_addr] (auto peer) {
+      return test_v2_protocol(test_addr, test_peer_addr, false
       ).finally([peer = std::move(peer)] () mutable {
         return peer->wait().then([peer = std::move(peer)] {});
       });
@@ -3562,8 +3570,7 @@ test_v2_protocol(entity_addr_t test_addr = entity_addr_t(),
     });
   }
 
-  test_addr.set_nonce(2);
-  return FailoverTest::create(cmd_peer_addr, test_addr).then([] (auto test) {
+  return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) {
     return seastar::futurize_apply([test] {
       return test_v2_lossy_early_connect_fault(*test);
     }).then([test] {
@@ -3670,12 +3677,26 @@ int main(int argc, char** argv)
     ("rounds", bpo::value<unsigned>()->default_value(512),
      "number of pingpong rounds")
     ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
-     "ratio of keepalive in ping messages");
+     "ratio of keepalive in ping messages")
+    ("v2-test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
+     "address of v2 failover tests")
+    ("v2-testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9013"),
+     "addresses of v2 failover testpeer"
+     " (CmdSrv address and TestPeer address with port+=1)")
+    ("v2-testpeer-islocal", bpo::value<bool>()->default_value(true),
+     "create a local crimson testpeer, or connect to a remote testpeer");
   return app.run(argc, argv, [&app] {
     auto&& config = app.configuration();
     verbose = config["verbose"].as<bool>();
     auto rounds = config["rounds"].as<unsigned>();
     auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+    entity_addr_t v2_test_addr;
+    ceph_assert(v2_test_addr.parse(
+          config["v2-test-addr"].as<std::string>().c_str(), nullptr));
+    entity_addr_t v2_testpeer_addr;
+    ceph_assert(v2_testpeer_addr.parse(
+          config["v2-testpeer-addr"].as<std::string>().c_str(), nullptr));
+    auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
     return test_echo(rounds, keepalive_ratio, false)
     .then([rounds, keepalive_ratio] {
       return test_echo(rounds, keepalive_ratio, true);
@@ -3687,8 +3708,8 @@ int main(int argc, char** argv)
       return test_preemptive_shutdown(false);
     }).then([] {
       return test_preemptive_shutdown(true);
-    }).then([] {
-      return test_v2_protocol();
+    }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
+      return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
     }).then([] {
       std::cout << "All tests succeeded" << std::endl;
     }).handle_exception([] (auto eptr) {