]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: implement preemptive shutdown/close
authorYingxin Cheng <yingxincheng@gmail.com>
Fri, 21 Jun 2019 08:26:48 +0000 (16:26 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 10 Jul 2019 08:41:02 +0000 (16:41 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/Protocol.cc
src/test/crimson/test_messenger.cc

index 31ec925f812f2800573cd4fe1c0b091987652fad..65bfe2d1b790ccfe1b214beb96e08aab5a84d79c 100644 (file)
@@ -55,10 +55,10 @@ seastar::future<> Protocol::close()
   assert(!close_ready.valid());
 
   if (socket) {
-    close_ready = socket->close()
-      .then([this] {
-        return pending_dispatch.close();
-      }).finally(std::move(cleanup));
+    socket->shutdown();
+    close_ready = pending_dispatch.close().finally([this] {
+      return socket->close();
+    }).finally(std::move(cleanup));
   } else {
     close_ready = pending_dispatch.close().finally(std::move(cleanup));
   }
index 924a7f839057b13d28b97ea80b6951b6f3aedf7b..be97c82bd1aeb25d2c67684f745dff99917b0f10 100644 (file)
@@ -434,6 +434,145 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
     });
 }
 
+seastar::future<> test_preemptive_shutdown(bool v2) {
+  struct test_state {
+    class Server final
+      : public ceph::net::Dispatcher,
+        public seastar::peering_sharded_service<Server> {
+      ceph::net::Messenger *msgr = nullptr;
+      ceph::auth::DummyAuthClientServer dummy_auth;
+
+      seastar::future<> ms_dispatch(ceph::net::Connection* c,
+                                    MessageRef m) override {
+        return c->send(MessageRef{new MPing, false});
+      }
+
+     public:
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce,
+                             const entity_addr_t& addr) {
+        return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
+        ).then([this, addr](ceph::net::Messenger *messenger) {
+          return container().invoke_on_all([messenger](auto& server) {
+            server.msgr = messenger->get_local_shard();
+            server.msgr->set_default_policy(ceph::net::SocketPolicy::stateless_server(0));
+            server.msgr->set_auth_client(&server.dummy_auth);
+            server.msgr->set_auth_server(&server.dummy_auth);
+          }).then([messenger, addr] {
+            return messenger->bind(entity_addrvec_t{addr});
+          }).then([this, messenger] {
+            return messenger->start(this);
+          });
+        });
+      }
+      entity_addr_t get_addr() const {
+        return msgr->get_myaddr();
+      }
+      seastar::future<> shutdown() {
+        return msgr->shutdown();
+      }
+      Dispatcher* get_local_shard() override {
+        return &(container().local());
+      }
+      seastar::future<> stop() {
+        return seastar::now();
+      }
+    };
+
+    class Client final
+      : public ceph::net::Dispatcher,
+        public seastar::peering_sharded_service<Client> {
+      ceph::net::Messenger *msgr = nullptr;
+      ceph::auth::DummyAuthClientServer dummy_auth;
+
+      bool stop_send = false;
+      seastar::promise<> stopped_send_promise;
+
+      seastar::future<> ms_dispatch(ceph::net::Connection* c,
+                                    MessageRef m) override {
+        return seastar::now();
+      }
+
+     public:
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce) {
+        return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
+        ).then([this](ceph::net::Messenger *messenger) {
+          return container().invoke_on_all([messenger](auto& client) {
+            client.msgr = messenger->get_local_shard();
+            client.msgr->set_default_policy(ceph::net::SocketPolicy::lossy_client(0));
+            client.msgr->set_auth_client(&client.dummy_auth);
+            client.msgr->set_auth_server(&client.dummy_auth);
+          }).then([this, messenger] {
+            return messenger->start(this);
+          });
+        });
+      }
+      seastar::future<> send_pings(const entity_addr_t& addr) {
+        return msgr->connect(addr, entity_name_t::TYPE_OSD
+        ).then([this](ceph::net::ConnectionXRef conn) {
+          seastar::do_until(
+            [this] { return stop_send; },
+            [this, conn = &**conn] {
+              return conn->send(MessageRef{new MPing, false}).then([] {
+                return seastar::sleep(0ms);
+              });
+            }
+          ).then_wrapped([this, conn] (auto fut) {
+            fut.forward_to(std::move(stopped_send_promise));
+          });
+        });
+      }
+      seastar::future<> shutdown() {
+        return msgr->shutdown().then([this] {
+          stop_send = true;
+          return stopped_send_promise.get_future();
+        });
+      }
+      Dispatcher* get_local_shard() override {
+        return &(container().local());
+      }
+      seastar::future<> stop() {
+        return seastar::now();
+      }
+    };
+  };
+
+  logger().info("test_preemptive_shutdown(v2={}):", v2);
+  return seastar::when_all_succeed(
+    ceph::net::create_sharded<test_state::Server>(),
+    ceph::net::create_sharded<test_state::Client>()
+  ).then([v2](test_state::Server *server,
+             test_state::Client *client) {
+    entity_addr_t addr;
+    addr.parse("127.0.0.1:9010", nullptr);
+    if (v2) {
+      addr.set_type(entity_addr_t::TYPE_MSGR2);
+    } else {
+      addr.set_type(entity_addr_t::TYPE_LEGACY);
+    }
+    addr.set_family(AF_INET);
+    return seastar::when_all_succeed(
+      server->init(entity_name_t::OSD(6), "server4", 7, addr),
+      client->init(entity_name_t::OSD(7), "client4", 8)
+    ).then([server, client] {
+      return client->send_pings(server->get_addr());
+    }).then([] {
+      return seastar::sleep(100ms);
+    }).then([client] {
+      logger().info("client shutdown...");
+      return client->shutdown();
+    }).finally([server] {
+      logger().info("server shutdown...");
+      return server->shutdown();
+    }).finally([] {
+      logger().info("test_preemptive_shutdown() done!\n");
+    });
+  });
+}
+
 }
 
 int main(int argc, char** argv)
@@ -458,6 +597,10 @@ int main(int argc, char** argv)
       return test_concurrent_dispatch(false);
     }).then([] {
       return test_concurrent_dispatch(true);
+    }).then([] {
+      return test_preemptive_shutdown(false);
+    }).then([] {
+      return test_preemptive_shutdown(true);
     }).then([] {
       std::cout << "All tests succeeded" << std::endl;
     }).handle_exception([] (auto eptr) {