#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>
-#include "test_cmds.h"
+#include "test_messenger.h"
using namespace std::chrono_literals;
namespace bpo = boost::program_options;
using crimson::net::MessengerRef;
using crimson::net::SocketPolicy;
using crimson::net::tag_bp_t;
-using ceph::net::test::cmd_t;
-using ceph::net::test::policy_t;
+using namespace ceph::net::test;
struct counter_t { unsigned counter = 0; };
}
private:
- seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+ seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) {
test_msgr->set_default_policy(policy);
test_msgr->set_auth_client(&dummy_auth);
test_msgr->set_auth_server(&dummy_auth);
test_msgr->set_interceptor(&interceptor);
- return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+ return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] {
return test_msgr->start({this});
- }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+ }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) {
logger().error("FailoverSuite: "
- "there is another instance running at {}", addr);
+ "there is another instance running at {}", test_addr);
ceph_abort();
}));
}
entity_addr_t test_peer_addr,
const TestInterceptor& interceptor) {
auto suite = std::make_unique<FailoverSuite>(
- Messenger::create(entity_name_t::OSD(2), "Test", 2),
+ Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE),
test_peer_addr, interceptor);
return suite->init(test_addr, test_policy
).then([suite = std::move(suite)] () mutable {
}
static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
- create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
- test_addr.set_nonce(2);
- cmd_peer_addr.set_nonce(3);
- entity_addr_t test_peer_addr = cmd_peer_addr;
- test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
- test_peer_addr.set_nonce(4);
+ create(entity_addr_t test_addr,
+ entity_addr_t cmd_peer_addr,
+ entity_addr_t test_peer_addr) {
auto test = seastar::make_lw_shared<FailoverTest>(
- Messenger::create(entity_name_t::OSD(1), "CmdCli", 1),
+ Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE),
test_addr, test_peer_addr);
return test->init(cmd_peer_addr).then([test] {
logger().info("CmdCli ready");
}
private:
- seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+ seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) {
peer_msgr->set_default_policy(policy);
peer_msgr->set_auth_client(&dummy_auth);
peer_msgr->set_auth_server(&dummy_auth);
- return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+ return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] {
return peer_msgr->start({this});
- }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+ }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) {
logger().error("FailoverSuitePeer: "
- "there is another instance running at {}", addr);
+ "there is another instance running at {}", test_peer_addr);
ceph_abort();
}));
}
return peer_msgr->shutdown();
}
- seastar::future<> connect_peer(entity_addr_t addr) {
- logger().info("[TestPeer] connect_peer({})", addr);
- auto new_tracked_conn = peer_msgr->connect(addr, entity_name_t::TYPE_OSD);
+ seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
+ logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
+ auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
if (tracked_conn) {
if (tracked_conn->is_closed()) {
ceph_assert(tracked_conn != new_tracked_conn);
}
static seastar::future<std::unique_ptr<FailoverSuitePeer>>
- create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) {
+ create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) {
auto suite = std::make_unique<FailoverSuitePeer>(
- Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback);
- return suite->init(addr, policy
+ Messenger::create(
+ entity_name_t::OSD(TEST_PEER_OSD),
+ "TestPeer",
+ TEST_PEER_NONCE),
+ op_callback
+ );
+ return suite->init(test_peer_addr, policy
).then([suite = std::move(suite)] () mutable {
return std::move(suite);
});
case cmd_t::suite_start: {
ceph_assert(!test_suite);
auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
- return FailoverSuitePeer::create(test_peer_addr, policy,
- [this] { return notify_recv_op(); }
+ return FailoverSuitePeer::create(
+ test_peer_addr, policy, [this] { return notify_recv_op(); }
).then([this] (auto suite) {
test_suite.swap(suite);
});
});
case cmd_t::suite_connect_me: {
ceph_assert(test_suite);
- entity_addr_t test_addr = entity_addr_t();
- test_addr.parse(m_cmd->cmd[1].c_str(), nullptr);
- return test_suite->connect_peer(test_addr);
+ entity_addr_t test_addr_decoded = entity_addr_t();
+ test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr);
+ return test_suite->connect_peer(test_addr_decoded);
}
case cmd_t::suite_send_me:
ceph_assert(test_suite);
}
static seastar::future<std::unique_ptr<FailoverTestPeer>>
- create(entity_addr_t cmd_peer_addr) {
- // suite bind to cmd_peer_addr, with port + 1
- entity_addr_t test_peer_addr = cmd_peer_addr;
- test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+ create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
auto test_peer = std::make_unique<FailoverTestPeer>(
- Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr);
+ Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE),
+ test_peer_addr);
return test_peer->init(cmd_peer_addr
).then([test_peer = std::move(test_peer)] () mutable {
logger().info("CmdSrv ready");
seastar::future<>
test_v2_protocol(entity_addr_t test_addr,
+ entity_addr_t cmd_peer_addr,
entity_addr_t test_peer_addr,
bool test_peer_islocal) {
ceph_assert_always(test_addr.is_msgr2());
+ ceph_assert_always(cmd_peer_addr.is_msgr2());
ceph_assert_always(test_peer_addr.is_msgr2());
if (test_peer_islocal) {
// initiate crimson test peer locally
- logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr);
- return FailoverTestPeer::create(test_peer_addr
- ).then([test_addr, test_peer_addr] (auto peer) {
- return test_v2_protocol(test_addr, test_peer_addr, false
+ logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr);
+ return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr
+ ).then([test_addr, cmd_peer_addr, test_peer_addr](auto peer) {
+ return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, false
).then([peer = std::move(peer)] () mutable {
return peer->wait().then([peer = std::move(peer)] {});
});
});
}
- return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) {
+ return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr
+ ).then([](auto test) {
return seastar::futurize_invoke([test] {
return test_v2_lossy_early_connect_fault(*test);
}).then([test] {
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();
auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+ auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
+
entity_addr_t test_addr;
ceph_assert(test_addr.parse(
config["test-addr"].as<std::string>().c_str(), nullptr));
- entity_addr_t testpeer_addr;
- ceph_assert(testpeer_addr.parse(
+ test_addr.set_nonce(TEST_NONCE);
+
+ entity_addr_t cmd_peer_addr;
+ ceph_assert(cmd_peer_addr.parse(
config["testpeer-addr"].as<std::string>().c_str(), nullptr));
- auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
+ cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
+
+ entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+
+ logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
+ "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
+ "testpeer_islocal={}",
+ verbose, rounds, keepalive_ratio,
+ test_addr, cmd_peer_addr, test_peer_addr,
+ testpeer_islocal);
return test_echo(rounds, keepalive_ratio)
.then([] {
return test_concurrent_dispatch();
}).then([] {
return test_preemptive_shutdown();
- }).then([test_addr, testpeer_addr, testpeer_islocal] {
- return test_v2_protocol(test_addr, testpeer_addr, testpeer_islocal);
+ }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal] {
+ return test_v2_protocol(test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal);
}).then([] {
logger().info("All tests succeeded");
// Seastar has bugs to have events undispatched during shutdown,
"address of v2 failover tests")
("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9013"),
"addresses of v2 failover testpeer"
- " (CmdSrv address and TestPeer address with port+=1)")
+ " (This is CmdSrv address, and TestPeer address is at port+=1)")
("testpeer-islocal", bpo::value<bool>()->default_value(true),
"create a local crimson testpeer, or connect to a remote testpeer");
return app.run(argc, argv, [&app] {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/msg_types.h"
+
+namespace ceph::net::test {
+
+constexpr uint64_t CMD_CLI_NONCE = 1;
+constexpr int64_t CMD_CLI_OSD = 1;
+constexpr uint64_t TEST_NONCE = 2;
+constexpr int64_t TEST_OSD = 2;
+constexpr uint64_t CMD_SRV_NONCE = 3;
+constexpr int64_t CMD_SRV_OSD = 3;
+constexpr uint64_t TEST_PEER_NONCE = 4;
+constexpr int64_t TEST_PEER_OSD = 4;
+
+inline entity_addr_t get_test_peer_addr(
+ const entity_addr_t &cmd_peer_addr) {
+ entity_addr_t test_peer_addr = cmd_peer_addr;
+ test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+ test_peer_addr.set_nonce(TEST_PEER_NONCE);
+ return test_peer_addr;
+}
+
+enum class cmd_t : char {
+ none = '\0',
+ shutdown,
+ suite_start,
+ suite_stop,
+ suite_connect_me,
+ suite_send_me,
+ suite_keepalive_me,
+ suite_markdown,
+ suite_recv_op
+};
+
+enum class policy_t : char {
+ none = '\0',
+ stateful_server,
+ stateless_server,
+ lossless_peer,
+ lossless_peer_reuse,
+ lossy_client,
+ lossless_client
+};
+
+inline std::ostream& operator<<(std::ostream& out, const cmd_t& cmd) {
+ switch(cmd) {
+ case cmd_t::none:
+ return out << "none";
+ case cmd_t::shutdown:
+ return out << "shutdown";
+ case cmd_t::suite_start:
+ return out << "suite_start";
+ case cmd_t::suite_stop:
+ return out << "suite_stop";
+ case cmd_t::suite_connect_me:
+ return out << "suite_connect_me";
+ case cmd_t::suite_send_me:
+ return out << "suite_send_me";
+ case cmd_t::suite_keepalive_me:
+ return out << "suite_keepalive_me";
+ case cmd_t::suite_markdown:
+ return out << "suite_markdown";
+ case cmd_t::suite_recv_op:
+ return out << "suite_recv_op";
+ default:
+ ceph_abort();
+ }
+}
+
+inline std::ostream& operator<<(std::ostream& out, const policy_t& policy) {
+ switch(policy) {
+ case policy_t::none:
+ return out << "none";
+ case policy_t::stateful_server:
+ return out << "stateful_server";
+ case policy_t::stateless_server:
+ return out << "stateless_server";
+ case policy_t::lossless_peer:
+ return out << "lossless_peer";
+ case policy_t::lossless_peer_reuse:
+ return out << "lossless_peer_reuse";
+ case policy_t::lossy_client:
+ return out << "lossy_client";
+ case policy_t::lossless_client:
+ return out << "lossless_client";
+ default:
+ ceph_abort();
+ }
+}
+
+} // namespace ceph::net::test
#include "msg/Dispatcher.h"
#include "msg/Messenger.h"
-#include "test_cmds.h"
+#include "test_messenger.h"
namespace {
#define dout_subsys ceph_subsys_test
-using ceph::net::test::cmd_t;
-using ceph::net::test::policy_t;
+using namespace ceph::net::test;
using SocketPolicy = Messenger::Policy;
constexpr int CEPH_OSD_PROTOCOL = 10;
private:
void init(entity_addr_t test_peer_addr, SocketPolicy policy) {
- peer_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(4), "TestPeer", 4));
+ peer_msgr.reset(Messenger::create(
+ cct, "async",
+ entity_name_t::OSD(TEST_PEER_OSD),
+ "TestPeer",
+ TEST_PEER_NONCE));
dummy_auth.auth_registry.refresh_config();
peer_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
peer_msgr->set_default_policy(policy);
}
void init(entity_addr_t cmd_peer_addr) {
- cmd_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(3), "CmdSrv", 3));
+ cmd_msgr.reset(Messenger::create(
+ cct, "async",
+ entity_name_t::OSD(CMD_SRV_OSD),
+ "CmdSrv",
+ CMD_SRV_NONCE));
dummy_auth.auth_registry.refresh_config();
cmd_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
cmd_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
void wait() { cmd_msgr->wait(); }
static std::unique_ptr<FailoverTestPeer>
- create(CephContext* cct, entity_addr_t cmd_peer_addr, bool nonstop) {
- // suite bind to cmd_peer_addr, with port + 1
- entity_addr_t test_peer_addr = cmd_peer_addr;
- test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
- auto test_peer = std::make_unique<FailoverTestPeer>(cct, test_peer_addr, nonstop);
+ create(CephContext* cct,
+ entity_addr_t cmd_peer_addr,
+ entity_addr_t test_peer_addr,
+ bool nonstop) {
+ auto test_peer = std::make_unique<FailoverTestPeer>(
+ cct, test_peer_addr, nonstop);
test_peer->init(cmd_peer_addr);
ldout(cct, 0) << "[CmdSrv] ready" << dendl;
return test_peer;
desc.add_options()
("help,h", "show help message")
("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9013"),
- "CmdSrv address, and TestPeer address with port+=1")
+ "This is CmdSrv address, and TestPeer address is at port+=1")
("nonstop", po::value<bool>()->default_value(false),
"Do not shutdown TestPeer when all tests are successful");
po::variables_map vm;
return 1;
}
- auto addr = vm["addr"].as<std::string>();
- entity_addr_t cmd_peer_addr;
- cmd_peer_addr.parse(addr.c_str(), nullptr);
- ceph_assert_always(cmd_peer_addr.is_msgr2());
- auto nonstop = vm["nonstop"].as<bool>();
-
std::vector<const char*> args(argv, argv + argc);
auto cct = global_init(nullptr, args,
CEPH_ENTITY_TYPE_CLIENT,
CINIT_FLAG_NO_MON_CONFIG);
common_init_finish(cct.get());
- auto test_peer = FailoverTestPeer::create(cct.get(), cmd_peer_addr, nonstop);
+ auto addr = vm["addr"].as<std::string>();
+ entity_addr_t cmd_peer_addr;
+ cmd_peer_addr.parse(addr.c_str(), nullptr);
+ cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
+ ceph_assert_always(cmd_peer_addr.is_msgr2());
+ auto test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+ auto nonstop = vm["nonstop"].as<bool>();
+ ldout(cct, 0) << "test configuration: cmd_peer_addr=" << cmd_peer_addr
+ << ", test_peer_addr=" << test_peer_addr
+ << ", nonstop=" << nonstop
+ << dendl;
+
+ auto test_peer = FailoverTestPeer::create(
+ cct.get(),
+ cmd_peer_addr,
+ test_peer_addr,
+ nonstop);
test_peer->wait();
}