]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: concurrent dispatch for SocketMessenger
authorCasey Bodley <cbodley@redhat.com>
Thu, 13 Sep 2018 18:33:03 +0000 (14:33 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 14 Sep 2018 18:47:32 +0000 (14:47 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index c52d9be2a6d97078745f2157a264b63472e5b2fd..0a8173d8e20d4b54a29e1bc18bc4d557755ff94f 100644 (file)
@@ -48,11 +48,13 @@ seastar::future<> SocketMessenger::dispatch(ConnectionRef conn)
   return seastar::keep_doing([=] {
       return conn->read_message()
         .then([=] (MessageRef msg) {
-          if (msg) {
-           return dispatcher->ms_dispatch(conn, std::move(msg));
-         } else {
-           return seastar::now();
-         }
+          // start dispatch, ignoring exceptions from the application layer
+          seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] {
+              return dispatcher->ms_dispatch(conn, std::move(msg))
+                .handle_exception([] (std::exception_ptr eptr) {});
+            });
+          // return immediately to start on the next message
+          return seastar::now();
         });
     }).handle_exception_type([=] (const std::system_error& e) {
       if (e.code() == error::connection_aborted ||
@@ -147,10 +149,16 @@ seastar::future<> SocketMessenger::shutdown()
   if (listener) {
     listener->abort_accept();
   }
+  // close all connections
   return seastar::parallel_for_each(connections.begin(), connections.end(),
     [this] (auto conn) {
       return conn.second->close();
-    }).finally([this] { connections.clear(); });
+    }).finally([this] {
+      connections.clear();
+      // closing connections will unblock any dispatchers that were waiting to
+      // send(). wait for any pending calls to finish
+      return pending_dispatch.close();
+    });
 }
 
 void SocketMessenger::set_default_policy(const SocketPolicy& p)
index 84e972a413ca0c9ce848005a15da66b5b31ca656..9297b37087f7559b87076ca341cab2698cf6f9bc 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <map>
 #include <optional>
+#include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 
 #include "msg/Policy.h"
@@ -32,6 +33,7 @@ class SocketMessenger final : public Messenger {
   std::map<entity_addr_t, ConnectionRef> connections;
   using Throttle = ceph::thread::Throttle;
   ceph::net::PolicySet<Throttle> policy_set;
+  seastar::gate pending_dispatch;
 
   seastar::future<> dispatch(ConnectionRef conn);