logger().info("client ms_dispatch {}", session->count);
}
- if (session->count == rounds) {
+ if (session->count > rounds) {
+ logger().error("{}: got {} pongs, more than expected {}", *c, session->count, rounds);
+ ceph_abort();
+ } else if (session->count == rounds) {
logger().info("{}: finished receiving {} pongs", *c, session->count);
session->finish_time = mono_clock::now();
gates.dispatch_in_background("echo_notify_done", [c, this] {
});
}
-static seastar::future<> test_concurrent_dispatch()
-{
- struct test_state {
- struct Server final
- : public crimson::net::Dispatcher {
- crimson::net::MessengerRef msgr;
- int count = 0;
- seastar::promise<> on_second; // satisfied on second dispatch
- seastar::promise<> on_done; // satisfied when first dispatch unblocks
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- std::optional<seastar::future<>> ms_dispatch(
- crimson::net::ConnectionRef, MessageRef m) override {
- switch (++count) {
- case 1:
- // block on the first request until we reenter with the second
- std::ignore = on_second.get_future().then([this] { on_done.set_value(); });
- break;
- case 2:
- on_second.set_value();
- break;
- default:
- throw std::runtime_error("unexpected count");
- }
- return {seastar::now()};
- }
-
- seastar::future<> wait() { return on_done.get_future(); }
-
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce,
- const entity_addr_t& addr) {
- msgr = crimson::net::Messenger::create(
- name, lname, nonce, true);
- msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
- return msgr->start({this});
- }, crimson::net::Messenger::bind_ertr::all_same_way(
- [addr] (const std::error_code& e) {
- logger().error("test_concurrent_dispatch(): "
- "there is another instance running at {}", addr);
- ceph_abort();
- }));
- }
- };
-
- struct Client final
- : public crimson::net::Dispatcher {
- crimson::net::MessengerRef msgr;
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- std::optional<seastar::future<>> ms_dispatch(
- crimson::net::ConnectionRef, MessageRef m) override {
- return {seastar::now()};
- }
-
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce) {
- msgr = crimson::net::Messenger::create(
- name, lname, nonce, true);
- msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->start({this});
- }
- };
- };
-
- logger().info("test_concurrent_dispatch():");
- auto server = seastar::make_shared<test_state::Server>();
- auto client = seastar::make_shared<test_state::Client>();
- auto addr = get_server_addr();
- addr.set_type(entity_addr_t::TYPE_MSGR2);
- addr.set_family(AF_INET);
- return seastar::when_all_succeed(
- server->init(entity_name_t::OSD(4), "server3", 5, addr),
- client->init(entity_name_t::OSD(5), "client3", 6)
- ).then_unpack([server, client] {
- auto conn = client->msgr->connect(server->msgr->get_myaddr(),
- entity_name_t::TYPE_OSD);
- // send two messages
- return conn->send(crimson::make_message<MPing>()).then([conn] {
- return conn->send(crimson::make_message<MPing>());
- });
- }).then([server] {
- return server->wait();
- }).then([client] {
- logger().info("client shutdown...");
- client->msgr->stop();
- return client->msgr->shutdown();
- }).then([server] {
- logger().info("server shutdown...");
- server->msgr->stop();
- return server->msgr->shutdown();
- }).then([] {
- logger().info("test_concurrent_dispatch() done!\n");
- }).handle_exception([server, client] (auto eptr) {
- logger().error("test_concurrent_dispatch() failed: got exception {}", eptr);
- throw;
- });
-}
-
seastar::future<> test_preemptive_shutdown() {
struct test_state {
class Server final
logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
"test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
- "testpeer_islocal={}, peer_wins={}",
+ "testpeer_islocal={}, peer_wins={}, smp={}",
verbose, rounds, keepalive_ratio,
test_addr, cmd_peer_addr, test_peer_addr,
- testpeer_islocal, peer_wins);
+ testpeer_islocal, peer_wins,
+ seastar::smp::count);
return test_echo(rounds, keepalive_ratio
).then([] {
- return test_concurrent_dispatch();
- }).then([] {
return test_preemptive_shutdown();
}).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
return test_v2_protocol(