});
}
+seastar::future<> test_preemptive_shutdown(bool v2) {
+ struct test_state {
+ class Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+ ceph::auth::DummyAuthClientServer dummy_auth;
+
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
+ MessageRef m) override {
+ return c->send(MessageRef{new MPing, false});
+ }
+
+ public:
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
+ ).then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ server.msgr->set_default_policy(ceph::net::SocketPolicy::stateless_server(0));
+ server.msgr->set_auth_client(&server.dummy_auth);
+ server.msgr->set_auth_server(&server.dummy_auth);
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+ entity_addr_t get_addr() const {
+ return msgr->get_myaddr();
+ }
+ seastar::future<> shutdown() {
+ return msgr->shutdown();
+ }
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::now();
+ }
+ };
+
+ class Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+ ceph::net::Messenger *msgr = nullptr;
+ ceph::auth::DummyAuthClientServer dummy_auth;
+
+ bool stop_send = false;
+ seastar::promise<> stopped_send_promise;
+
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
+ MessageRef m) override {
+ return seastar::now();
+ }
+
+ public:
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
+ ).then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ client.msgr->set_default_policy(ceph::net::SocketPolicy::lossy_client(0));
+ client.msgr->set_auth_client(&client.dummy_auth);
+ client.msgr->set_auth_server(&client.dummy_auth);
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+ seastar::future<> send_pings(const entity_addr_t& addr) {
+ return msgr->connect(addr, entity_name_t::TYPE_OSD
+ ).then([this](ceph::net::ConnectionXRef conn) {
+ seastar::do_until(
+ [this] { return stop_send; },
+ [this, conn = &**conn] {
+ return conn->send(MessageRef{new MPing, false}).then([] {
+ return seastar::sleep(0ms);
+ });
+ }
+ ).then_wrapped([this, conn] (auto fut) {
+ fut.forward_to(std::move(stopped_send_promise));
+ });
+ });
+ }
+ seastar::future<> shutdown() {
+ return msgr->shutdown().then([this] {
+ stop_send = true;
+ return stopped_send_promise.get_future();
+ });
+ }
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::now();
+ }
+ };
+ };
+
+ logger().info("test_preemptive_shutdown(v2={}):", v2);
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>()
+ ).then([v2](test_state::Server *server,
+ test_state::Client *client) {
+ entity_addr_t addr;
+ addr.parse("127.0.0.1:9010", nullptr);
+ if (v2) {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ }
+ addr.set_family(AF_INET);
+ return seastar::when_all_succeed(
+ server->init(entity_name_t::OSD(6), "server4", 7, addr),
+ client->init(entity_name_t::OSD(7), "client4", 8)
+ ).then([server, client] {
+ return client->send_pings(server->get_addr());
+ }).then([] {
+ return seastar::sleep(100ms);
+ }).then([client] {
+ logger().info("client shutdown...");
+ return client->shutdown();
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->shutdown();
+ }).finally([] {
+ logger().info("test_preemptive_shutdown() done!\n");
+ });
+ });
+}
+
}
int main(int argc, char** argv)
return test_concurrent_dispatch(false);
}).then([] {
return test_concurrent_dispatch(true);
+ }).then([] {
+ return test_preemptive_shutdown(false);
+ }).then([] {
+ return test_preemptive_shutdown(true);
}).then([] {
std::cout << "All tests succeeded" << std::endl;
}).handle_exception([] (auto eptr) {