]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: add unit test for concurrent dispatch
authorCasey Bodley <cbodley@redhat.com>
Thu, 13 Sep 2018 20:34:48 +0000 (16:34 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 14 Sep 2018 18:47:32 +0000 (14:47 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/test/crimson/test_messenger.cc

index 166a9180d8d44c7d9f929a31d2db5f543cffa758..c16434113e221aadac536365e6f4815183b56955 100644 (file)
@@ -121,6 +121,69 @@ static seastar::future<> test_echo(unsigned rounds,
     });
 }
 
+static seastar::future<> test_concurrent_dispatch()
+{
+  struct test_state {
+    entity_addr_t addr;
+
+    struct {
+      ceph::net::SocketMessenger messenger{entity_name_t::OSD(1)};
+      class ServerDispatcher : public ceph::net::Dispatcher {
+        int count = 0;
+        seastar::promise<> on_second; // satisfied on second dispatch
+        seastar::promise<> on_done; // satisfied when first dispatch unblocks
+       public:
+        seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+                                      MessageRef m) override {
+          switch (++count) {
+          case 1:
+            // block on the first request until we reenter with the second
+            return on_second.get_future().then([=] { on_done.set_value(); });
+          case 2:
+            on_second.set_value();
+            return seastar::now();
+          default:
+            throw std::runtime_error("unexpected count");
+          }
+        }
+        seastar::future<> wait() { return on_done.get_future(); }
+      } dispatcher;
+    } server;
+
+    struct {
+      ceph::net::SocketMessenger messenger{entity_name_t::OSD(0)};
+      ceph::net::Dispatcher dispatcher;
+    } client;
+  };
+  return seastar::do_with(test_state{},
+    [] (test_state& t) {
+      // bind the server
+      t.addr.set_family(AF_INET);
+      t.addr.set_port(9010);
+      t.server.messenger.bind(t.addr);
+
+      return t.server.messenger.start(&t.server.dispatcher)
+        .then([&] {
+          return t.client.messenger.start(&t.client.dispatcher)
+            .then([&] {
+              return t.client.messenger.connect(t.addr,
+                                                entity_name_t::TYPE_OSD);
+            }).then([] (ceph::net::ConnectionRef conn) {
+              // send two messages
+              conn->send(MessageRef{new MPing, false});
+              conn->send(MessageRef{new MPing, false});
+            }).then([&] {
+              // wait for the server to get both
+              return t.server.dispatcher.wait();
+            }).finally([&] {
+              return t.client.messenger.shutdown();
+            });
+        }).finally([&] {
+          return t.server.messenger.shutdown();
+        });
+    });
+}
+
 int main(int argc, char** argv)
 {
   seastar::app_template app;
@@ -136,7 +199,10 @@ int main(int argc, char** argv)
     verbose = config["verbose"].as<bool>();
     auto rounds = config["rounds"].as<unsigned>();
     auto keepalive_ratio = config["keepalive-ratio"].as<double>();
-    return test_echo(rounds, keepalive_ratio).then([] {
+    return test_echo(rounds, keepalive_ratio)
+    .then([] {
+      return test_concurrent_dispatch();
+    }).then([] {
       std::cout << "All tests succeeded" << std::endl;
     }).handle_exception([] (auto eptr) {
       std::cout << "Test failure" << std::endl;