From 0bf1ac63b597674828162b0a056c721cc5e2d0b2 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 21 Jun 2019 16:26:48 +0800 Subject: [PATCH] crimson/net: implement preemptive shutdown/close Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 8 +- src/test/crimson/test_messenger.cc | 143 +++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 4 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 31ec925f812..65bfe2d1b79 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -55,10 +55,10 @@ seastar::future<> Protocol::close() assert(!close_ready.valid()); if (socket) { - close_ready = socket->close() - .then([this] { - return pending_dispatch.close(); - }).finally(std::move(cleanup)); + socket->shutdown(); + close_ready = pending_dispatch.close().finally([this] { + return socket->close(); + }).finally(std::move(cleanup)); } else { close_ready = pending_dispatch.close().finally(std::move(cleanup)); } diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 924a7f83905..be97c82bd1a 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -434,6 +434,145 @@ static seastar::future<> test_concurrent_dispatch(bool v2) }); } +seastar::future<> test_preemptive_shutdown(bool v2) { + struct test_state { + class Server final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + ceph::net::Messenger *msgr = nullptr; + ceph::auth::DummyAuthClientServer dummy_auth; + + seastar::future<> ms_dispatch(ceph::net::Connection* c, + MessageRef m) override { + return c->send(MessageRef{new MPing, false}); + } + + public: + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const entity_addr_t& addr) { + return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id() + ).then([this, addr](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + server.msgr->set_default_policy(ceph::net::SocketPolicy::stateless_server(0)); + server.msgr->set_auth_client(&server.dummy_auth); + server.msgr->set_auth_server(&server.dummy_auth); + }).then([messenger, addr] { + return messenger->bind(entity_addrvec_t{addr}); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + entity_addr_t get_addr() const { + return msgr->get_myaddr(); + } + seastar::future<> shutdown() { + return msgr->shutdown(); + } + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::now(); + } + }; + + class Client final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + ceph::net::Messenger *msgr = nullptr; + ceph::auth::DummyAuthClientServer dummy_auth; + + bool stop_send = false; + seastar::promise<> stopped_send_promise; + + seastar::future<> ms_dispatch(ceph::net::Connection* c, + MessageRef m) override { + return seastar::now(); + } + + public: + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id() + ).then([this](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& client) { + client.msgr = messenger->get_local_shard(); + client.msgr->set_default_policy(ceph::net::SocketPolicy::lossy_client(0)); + client.msgr->set_auth_client(&client.dummy_auth); + client.msgr->set_auth_server(&client.dummy_auth); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + seastar::future<> send_pings(const entity_addr_t& addr) { + return msgr->connect(addr, entity_name_t::TYPE_OSD + ).then([this](ceph::net::ConnectionXRef conn) { + seastar::do_until( + [this] { return stop_send; }, + [this, conn = &**conn] { + return conn->send(MessageRef{new MPing, false}).then([] { + return seastar::sleep(0ms); + }); + } + ).then_wrapped([this, conn] (auto fut) { + fut.forward_to(std::move(stopped_send_promise)); + }); + }); + } + seastar::future<> shutdown() { + return msgr->shutdown().then([this] { + stop_send = true; + return stopped_send_promise.get_future(); + }); + } + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::now(); + } + }; + }; + + logger().info("test_preemptive_shutdown(v2={}):", v2); + return seastar::when_all_succeed( + ceph::net::create_sharded(), + ceph::net::create_sharded() + ).then([v2](test_state::Server *server, + test_state::Client *client) { + entity_addr_t addr; + addr.parse("127.0.0.1:9010", nullptr); + if (v2) { + addr.set_type(entity_addr_t::TYPE_MSGR2); + } else { + addr.set_type(entity_addr_t::TYPE_LEGACY); + } + addr.set_family(AF_INET); + return seastar::when_all_succeed( + server->init(entity_name_t::OSD(6), "server4", 7, addr), + client->init(entity_name_t::OSD(7), "client4", 8) + ).then([server, client] { + return client->send_pings(server->get_addr()); + }).then([] { + return seastar::sleep(100ms); + }).then([client] { + logger().info("client shutdown..."); + return client->shutdown(); + }).finally([server] { + logger().info("server shutdown..."); + return server->shutdown(); + }).finally([] { + logger().info("test_preemptive_shutdown() done!\n"); + }); + }); +} + } int main(int argc, char** argv) @@ -458,6 +597,10 @@ int main(int argc, char** argv) return test_concurrent_dispatch(false); }).then([] { return test_concurrent_dispatch(true); + }).then([] { + return test_preemptive_shutdown(false); + }).then([] { + return test_preemptive_shutdown(true); }).then([] { std::cout << "All tests succeeded" << std::endl; }).handle_exception([] (auto eptr) { -- 2.39.5