From 24bfdc08f7796a4ab2e5ee2047277a82b1a124ff Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 23 Oct 2019 10:48:54 +0800 Subject: [PATCH] test/crimson: allow v2 failover tests to connect to remote TestPeer Signed-off-by: Yingxin Cheng --- src/test/crimson/test_messenger.cc | 85 +++++++++++++++++++----------- 1 file changed, 53 insertions(+), 32 deletions(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index d52868ce23813..0ee7f2876b0f5 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -1144,6 +1144,10 @@ class FailoverSuite : public Dispatcher { test_peer_addr(test_peer_addr), interceptor(interceptor) { } + entity_addr_t get_addr() const { + return test_msgr.get_myaddr(); + } + seastar::future<> shutdown() { return test_msgr.shutdown(); } @@ -1375,6 +1379,7 @@ class FailoverTest : public Dispatcher { cmd_msgr.set_auth_client(&dummy_auth); cmd_msgr.set_auth_server(&dummy_auth); return cmd_msgr.start(this).then([this, cmd_peer_addr] { + logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr); return cmd_msgr.connect(cmd_peer_addr, entity_name_t::TYPE_OSD); }).then([this] (auto conn) { cmd_conn = conn->release(); @@ -1404,9 +1409,10 @@ class FailoverTest : public Dispatcher { static seastar::future> create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) { - assert(cmd_peer_addr.is_msgr2()); return Messenger::create(entity_name_t::OSD(1), "CmdCli", 1, 0 - ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) { + ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) mutable { + test_addr.set_nonce(2); + cmd_peer_addr.set_nonce(3); entity_addr_t test_peer_addr = cmd_peer_addr; test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); test_peer_addr.set_nonce(4); @@ -1433,6 +1439,7 @@ class FailoverTest : public Dispatcher { return FailoverSuite::create( test_addr, test_policy_, test_peer_addr, interceptor ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable { + ceph_assert(suite->get_addr() == test_addr); test_suite.swap(suite); return start_peer(peer_policy).then([this, f = std::move(f)] { return f(*test_suite); @@ -1626,6 +1633,7 @@ class FailoverTestPeer : public Dispatcher { crimson::auth::DummyAuthClientServer dummy_auth; Messenger& cmd_msgr; ConnectionRef cmd_conn; + const entity_addr_t test_peer_addr; std::unique_ptr test_suite; seastar::future<> ms_dispatch(Connection* c, MessageRef m) override { @@ -1669,9 +1677,6 @@ class FailoverTestPeer : public Dispatcher { switch (cmd) { case cmd_t::suite_start: { ceph_assert(!test_suite); - // suite bind to cmd_addr, with port + 1 - auto test_peer_addr = get_addr(); - test_peer_addr.set_port(get_addr().get_port() + 1); auto policy = to_socket_policy(static_cast(m_cmd->cmd[1][0])); return FailoverSuitePeer::create(test_peer_addr, policy, [this] { return notify_recv_op(); } @@ -1706,34 +1711,34 @@ class FailoverTestPeer : public Dispatcher { } } - seastar::future<> init(entity_addr_t cmd_addr) { + seastar::future<> init(entity_addr_t cmd_peer_addr) { cmd_msgr.set_default_policy(SocketPolicy::stateless_server(0)); cmd_msgr.set_auth_client(&dummy_auth); cmd_msgr.set_auth_server(&dummy_auth); - return cmd_msgr.bind(entity_addrvec_t{cmd_addr}).then([this] { + return cmd_msgr.bind(entity_addrvec_t{cmd_peer_addr}).then([this] { return cmd_msgr.start(this); }); } public: - FailoverTestPeer(Messenger& cmd_msgr) - : cmd_msgr(cmd_msgr) { } - - entity_addr_t get_addr() const { - return cmd_msgr.get_myaddr(); - } + FailoverTestPeer(Messenger& cmd_msgr, + entity_addr_t test_peer_addr) + : cmd_msgr(cmd_msgr), + test_peer_addr(test_peer_addr) { } seastar::future<> wait() { return cmd_msgr.wait(); } - static seastar::future> create() { + static seastar::future> + create(entity_addr_t cmd_peer_addr) { return Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3, 0 - ).then([] (Messenger* cmd_msgr) { - entity_addr_t cmd_addr; - cmd_addr.parse("v2:127.0.0.1:9011", nullptr); - auto test_peer = std::make_unique(*cmd_msgr); - return test_peer->init(cmd_addr + ).then([cmd_peer_addr] (Messenger* cmd_msgr) { + // suite bind to cmd_peer_addr, with port + 1 + entity_addr_t test_peer_addr = cmd_peer_addr; + test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); + auto test_peer = std::make_unique(*cmd_msgr, test_peer_addr); + return test_peer->init(cmd_peer_addr ).then([test_peer = std::move(test_peer)] () mutable { logger().info("CmdSrv ready"); return std::move(test_peer); @@ -3544,15 +3549,18 @@ test_v2_lossless_peer_acceptor(FailoverTest& test) { } seastar::future<> -test_v2_protocol(entity_addr_t test_addr = entity_addr_t(), - entity_addr_t cmd_peer_addr = entity_addr_t()) { - if (test_addr == entity_addr_t() || cmd_peer_addr == entity_addr_t()) { +test_v2_protocol(entity_addr_t test_addr, + entity_addr_t test_peer_addr, + bool test_peer_islocal) { + ceph_assert(test_addr.is_msgr2()); + ceph_assert(test_peer_addr.is_msgr2()); + + if (test_peer_islocal) { // initiate crimson test peer locally - logger().info("test_v2_protocol: start local TestPeer..."); - return FailoverTestPeer::create().then([] (auto peer) { - entity_addr_t test_addr_; - test_addr_.parse("v2:127.0.0.1:9010"); - return test_v2_protocol(test_addr_, peer->get_addr() + logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr); + return FailoverTestPeer::create(test_peer_addr + ).then([test_addr, test_peer_addr] (auto peer) { + return test_v2_protocol(test_addr, test_peer_addr, false ).finally([peer = std::move(peer)] () mutable { return peer->wait().then([peer = std::move(peer)] {}); }); @@ -3562,8 +3570,7 @@ test_v2_protocol(entity_addr_t test_addr = entity_addr_t(), }); } - test_addr.set_nonce(2); - return FailoverTest::create(cmd_peer_addr, test_addr).then([] (auto test) { + return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) { return seastar::futurize_apply([test] { return test_v2_lossy_early_connect_fault(*test); }).then([test] { @@ -3670,12 +3677,26 @@ int main(int argc, char** argv) ("rounds", bpo::value()->default_value(512), "number of pingpong rounds") ("keepalive-ratio", bpo::value()->default_value(0.1), - "ratio of keepalive in ping messages"); + "ratio of keepalive in ping messages") + ("v2-test-addr", bpo::value()->default_value("v2:127.0.0.1:9012"), + "address of v2 failover tests") + ("v2-testpeer-addr", bpo::value()->default_value("v2:127.0.0.1:9013"), + "addresses of v2 failover testpeer" + " (CmdSrv address and TestPeer address with port+=1)") + ("v2-testpeer-islocal", bpo::value()->default_value(true), + "create a local crimson testpeer, or connect to a remote testpeer"); return app.run(argc, argv, [&app] { auto&& config = app.configuration(); verbose = config["verbose"].as(); auto rounds = config["rounds"].as(); auto keepalive_ratio = config["keepalive-ratio"].as(); + entity_addr_t v2_test_addr; + ceph_assert(v2_test_addr.parse( + config["v2-test-addr"].as().c_str(), nullptr)); + entity_addr_t v2_testpeer_addr; + ceph_assert(v2_testpeer_addr.parse( + config["v2-testpeer-addr"].as().c_str(), nullptr)); + auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as(); return test_echo(rounds, keepalive_ratio, false) .then([rounds, keepalive_ratio] { return test_echo(rounds, keepalive_ratio, true); @@ -3687,8 +3708,8 @@ int main(int argc, char** argv) return test_preemptive_shutdown(false); }).then([] { return test_preemptive_shutdown(true); - }).then([] { - return test_v2_protocol(); + }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] { + return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal); }).then([] { std::cout << "All tests succeeded" << std::endl; }).handle_exception([] (auto eptr) { -- 2.39.5