test_peer_addr(test_peer_addr),
interceptor(interceptor) { }
+ entity_addr_t get_addr() const {
+ return test_msgr.get_myaddr();
+ }
+
seastar::future<> shutdown() {
return test_msgr.shutdown();
}
cmd_msgr.set_auth_client(&dummy_auth);
cmd_msgr.set_auth_server(&dummy_auth);
return cmd_msgr.start(this).then([this, cmd_peer_addr] {
+ logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
return cmd_msgr.connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
}).then([this] (auto conn) {
cmd_conn = conn->release();
static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
- assert(cmd_peer_addr.is_msgr2());
return Messenger::create(entity_name_t::OSD(1), "CmdCli", 1, 0
- ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) {
+ ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) mutable {
+ test_addr.set_nonce(2);
+ cmd_peer_addr.set_nonce(3);
entity_addr_t test_peer_addr = cmd_peer_addr;
test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
test_peer_addr.set_nonce(4);
return FailoverSuite::create(
test_addr, test_policy_, test_peer_addr, interceptor
).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
+ ceph_assert(suite->get_addr() == test_addr);
test_suite.swap(suite);
return start_peer(peer_policy).then([this, f = std::move(f)] {
return f(*test_suite);
crimson::auth::DummyAuthClientServer dummy_auth;
Messenger& cmd_msgr;
ConnectionRef cmd_conn;
+ const entity_addr_t test_peer_addr;
std::unique_ptr<FailoverSuitePeer> test_suite;
seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
switch (cmd) {
case cmd_t::suite_start: {
ceph_assert(!test_suite);
- // suite bind to cmd_addr, with port + 1
- auto test_peer_addr = get_addr();
- test_peer_addr.set_port(get_addr().get_port() + 1);
auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
return FailoverSuitePeer::create(test_peer_addr, policy,
[this] { return notify_recv_op(); }
}
}
- seastar::future<> init(entity_addr_t cmd_addr) {
+ seastar::future<> init(entity_addr_t cmd_peer_addr) {
cmd_msgr.set_default_policy(SocketPolicy::stateless_server(0));
cmd_msgr.set_auth_client(&dummy_auth);
cmd_msgr.set_auth_server(&dummy_auth);
- return cmd_msgr.bind(entity_addrvec_t{cmd_addr}).then([this] {
+ return cmd_msgr.bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
return cmd_msgr.start(this);
});
}
public:
- FailoverTestPeer(Messenger& cmd_msgr)
- : cmd_msgr(cmd_msgr) { }
-
- entity_addr_t get_addr() const {
- return cmd_msgr.get_myaddr();
- }
+ FailoverTestPeer(Messenger& cmd_msgr,
+ entity_addr_t test_peer_addr)
+ : cmd_msgr(cmd_msgr),
+ test_peer_addr(test_peer_addr) { }
seastar::future<> wait() {
return cmd_msgr.wait();
}
- static seastar::future<std::unique_ptr<FailoverTestPeer>> create() {
+ static seastar::future<std::unique_ptr<FailoverTestPeer>>
+ create(entity_addr_t cmd_peer_addr) {
return Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3, 0
- ).then([] (Messenger* cmd_msgr) {
- entity_addr_t cmd_addr;
- cmd_addr.parse("v2:127.0.0.1:9011", nullptr);
- auto test_peer = std::make_unique<FailoverTestPeer>(*cmd_msgr);
- return test_peer->init(cmd_addr
+ ).then([cmd_peer_addr] (Messenger* cmd_msgr) {
+ // suite bind to cmd_peer_addr, with port + 1
+ entity_addr_t test_peer_addr = cmd_peer_addr;
+ test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+ auto test_peer = std::make_unique<FailoverTestPeer>(*cmd_msgr, test_peer_addr);
+ return test_peer->init(cmd_peer_addr
).then([test_peer = std::move(test_peer)] () mutable {
logger().info("CmdSrv ready");
return std::move(test_peer);
}
seastar::future<>
-test_v2_protocol(entity_addr_t test_addr = entity_addr_t(),
- entity_addr_t cmd_peer_addr = entity_addr_t()) {
- if (test_addr == entity_addr_t() || cmd_peer_addr == entity_addr_t()) {
+test_v2_protocol(entity_addr_t test_addr,
+ entity_addr_t test_peer_addr,
+ bool test_peer_islocal) {
+ ceph_assert(test_addr.is_msgr2());
+ ceph_assert(test_peer_addr.is_msgr2());
+
+ if (test_peer_islocal) {
// initiate crimson test peer locally
- logger().info("test_v2_protocol: start local TestPeer...");
- return FailoverTestPeer::create().then([] (auto peer) {
- entity_addr_t test_addr_;
- test_addr_.parse("v2:127.0.0.1:9010");
- return test_v2_protocol(test_addr_, peer->get_addr()
+ logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr);
+ return FailoverTestPeer::create(test_peer_addr
+ ).then([test_addr, test_peer_addr] (auto peer) {
+ return test_v2_protocol(test_addr, test_peer_addr, false
).finally([peer = std::move(peer)] () mutable {
return peer->wait().then([peer = std::move(peer)] {});
});
});
}
- test_addr.set_nonce(2);
- return FailoverTest::create(cmd_peer_addr, test_addr).then([] (auto test) {
+ return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) {
return seastar::futurize_apply([test] {
return test_v2_lossy_early_connect_fault(*test);
}).then([test] {
("rounds", bpo::value<unsigned>()->default_value(512),
"number of pingpong rounds")
("keepalive-ratio", bpo::value<double>()->default_value(0.1),
- "ratio of keepalive in ping messages");
+ "ratio of keepalive in ping messages")
+ ("v2-test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
+ "address of v2 failover tests")
+ ("v2-testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9013"),
+ "addresses of v2 failover testpeer"
+ " (CmdSrv address and TestPeer address with port+=1)")
+ ("v2-testpeer-islocal", bpo::value<bool>()->default_value(true),
+ "create a local crimson testpeer, or connect to a remote testpeer");
return app.run(argc, argv, [&app] {
auto&& config = app.configuration();
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();
auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+ entity_addr_t v2_test_addr;
+ ceph_assert(v2_test_addr.parse(
+ config["v2-test-addr"].as<std::string>().c_str(), nullptr));
+ entity_addr_t v2_testpeer_addr;
+ ceph_assert(v2_testpeer_addr.parse(
+ config["v2-testpeer-addr"].as<std::string>().c_str(), nullptr));
+ auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
return test_echo(rounds, keepalive_ratio, false)
.then([rounds, keepalive_ratio] {
return test_echo(rounds, keepalive_ratio, true);
return test_preemptive_shutdown(false);
}).then([] {
return test_preemptive_shutdown(true);
- }).then([] {
- return test_v2_protocol();
+ }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
+ return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
}).then([] {
std::cout << "All tests succeeded" << std::endl;
}).handle_exception([] (auto eptr) {