]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: dispatch events in SocketConnection
authorYingxin <yingxin.cheng@intel.com>
Wed, 21 Nov 2018 20:39:37 +0000 (04:39 +0800)
committerYingxin <yingxin.cheng@intel.com>
Thu, 20 Dec 2018 19:08:55 +0000 (03:08 +0800)
* move dispatch(), and exception handling logics in accept() and
  connect() from SocketMessenger into SocketConnection, so we can manage
  the state transition in the same class and at the same abstraction
  level.
* gate the dangling futures in SocketConnection, because the
  connection's smart_ptr won't be hold by messenger any more during
  exception handling.
* don't return close() inside SocketConnection to prevent recursive
  gating -- dead lock.

Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index a78bd5864bbd9954c48031151fbf7744f6a3a2c1..480e416525af4482ce72d2245bf4610f47f3d8b7 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "crimson/common/log.h"
 #include "Config.h"
+#include "Dispatcher.h"
 #include "Errors.h"
 #include "SocketMessenger.h"
 
@@ -43,15 +44,18 @@ namespace {
 }
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
-                                   const entity_addr_t& my_addr)
+                                   const entity_addr_t& my_addr,
+                                   Dispatcher& dispatcher)
   : Connection(my_addr),
     messenger(messenger),
+    dispatcher(dispatcher),
     send_ready(h.promise.get_future())
 {
 }
 
 SocketConnection::~SocketConnection()
 {
+  ceph_assert(pending_dispatch.is_closed());
   // errors were reported to callers of send()
   ceph_assert(send_ready.available());
   send_ready.ignore_ready_future();
@@ -310,10 +314,13 @@ seastar::future<> SocketConnection::close()
   assert(!close_ready.valid());
 
   if (socket) {
-    close_ready = socket->close().finally(std::move(cleanup));
+    close_ready = socket->close()
+      .then([this] {
+        return pending_dispatch.close();
+      }).finally(std::move(cleanup));
   } else {
     ceph_assert(state == state_t::connecting);
-    close_ready = seastar::now();
+    close_ready = pending_dispatch.close().finally(std::move(cleanup));
   }
   state = state_t::closing;
   return close_ready.get_future();
@@ -765,15 +772,8 @@ SocketConnection::repeat_connect()
 }
 
 seastar::future<>
-SocketConnection::start_connect(const entity_addr_t& _peer_addr,
-                                const entity_type_t& _peer_type)
+SocketConnection::start_connect()
 {
-  ceph_assert(state == state_t::none);
-  ceph_assert(!socket);
-  peer_addr = _peer_addr;
-  peer_type = _peer_type;
-  messenger.register_conn(this);
-  state = state_t::connecting;
   return seastar::connect(peer_addr.in4_addr())
     .then([this](seastar::connected_socket fd) {
       if (state == state_t::closing) {
@@ -819,16 +819,39 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
     });
 }
 
-seastar::future<>
-SocketConnection::start_accept(seastar::connected_socket&& fd,
-                               const entity_addr_t& _peer_addr)
+void
+SocketConnection::connect(const entity_addr_t& _peer_addr,
+                          const entity_type_t& _peer_type)
 {
   ceph_assert(state == state_t::none);
   ceph_assert(!socket);
   peer_addr = _peer_addr;
-  socket.emplace(std::move(fd));
-  messenger.accept_conn(this);
-  state = state_t::accepting;
+  peer_type = _peer_type;
+  messenger.register_conn(this);
+  state = state_t::connecting;
+  seastar::with_gate(pending_dispatch, [this] {
+      return start_connect()
+        .then([this] {
+          // notify the dispatcher and allow them to reject the connection
+          return seastar::with_gate(messenger.pending_dispatch, [this] {
+            return dispatcher.ms_handle_connect(this);
+          });
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          // close the connection before returning errors
+          return seastar::make_exception_future<>(eptr)
+            .finally([this] { close(); });
+          // TODO: retry on fault
+        }).then([this] {
+          // dispatch replies on this connection
+          dispatch()
+            .handle_exception([] (std::exception_ptr eptr) {});
+        });
+    });
+}
+
+seastar::future<>
+SocketConnection::start_accept()
+{
   // encode/send server's handshake header
   bufferlist bl;
   bl.append(buffer::create_static(banner_size, banner));
@@ -862,6 +885,67 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
     });
 }
 
+seastar::future<>
+SocketConnection::accept(seastar::connected_socket&& fd,
+                         const entity_addr_t& _peer_addr)
+{
+  ceph_assert(state == state_t::none);
+  ceph_assert(!socket);
+  peer_addr = _peer_addr;
+  socket.emplace(std::move(fd));
+  messenger.accept_conn(this);
+  state = state_t::accepting;
+  return seastar::with_gate(pending_dispatch, [this] {
+      return start_accept()
+        .then([this] {
+          // notify the dispatcher and allow them to reject the connection
+          return seastar::with_gate(messenger.pending_dispatch, [=] {
+              return dispatcher.ms_handle_accept(this);
+            });
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          // close the connection before returning errors
+          return seastar::make_exception_future<>(eptr)
+            .finally([this] { close(); });
+        }).then([this] {
+          // dispatch messages until the connection closes or the dispatch
+          // queue shuts down
+          return dispatch();
+        });
+    });
+}
+
+seastar::future<>
+SocketConnection::dispatch()
+{
+  return seastar::with_gate(pending_dispatch, [this] {
+      return seastar::keep_doing([=] {
+          return read_message()
+            .then([=] (MessageRef msg) {
+              // start dispatch, ignoring exceptions from the application layer
+              seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] {
+                  return dispatcher.ms_dispatch(this, std::move(msg))
+                    .handle_exception([] (std::exception_ptr eptr) {});
+                });
+              // return immediately to start on the next message
+              return seastar::now();
+            });
+        }).handle_exception_type([=] (const std::system_error& e) {
+          if (e.code() == error::connection_aborted ||
+              e.code() == error::connection_reset) {
+            return seastar::with_gate(messenger.pending_dispatch, [=] {
+                return dispatcher.ms_handle_reset(this);
+              });
+          } else if (e.code() == error::read_eof) {
+            return seastar::with_gate(messenger.pending_dispatch, [=] {
+                return dispatcher.ms_handle_remote_reset(this);
+              });
+          } else {
+            throw e;
+          }
+        });
+    });
+}
+
 seastar::future<> SocketConnection::fault()
 {
   if (policy.lossy) {
index 3054744450d687d1b61df1adfb53fa787a488e3e..3413b4d054e1babf42125c05e8fb6e4328f4d464 100644 (file)
@@ -14,6 +14,7 @@
 
 #pragma once
 
+#include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/shared_future.hh>
 
@@ -36,6 +37,8 @@ using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
 class SocketConnection : public Connection {
   SocketMessenger& messenger;
   std::optional<Socket> socket;
+  Dispatcher& dispatcher;
+  seastar::gate pending_dispatch;
 
   enum class state_t {
     none,
@@ -150,9 +153,19 @@ class SocketConnection : public Connection {
 
   seastar::future<> fault();
 
+  seastar::future<> dispatch();
+
+  /// start a handshake from the client's perspective,
+  /// only call when SocketConnection first construct
+  seastar::future<> start_connect();
+  /// start a handshake from the server's perspective,
+  /// only call when SocketConnection first construct
+  seastar::future<> start_accept();
+
  public:
   SocketConnection(SocketMessenger& messenger,
-                   const entity_addr_t& my_addr);
+                   const entity_addr_t& my_addr,
+                   Dispatcher& dispatcher);
   ~SocketConnection();
 
   Messenger* get_messenger() const override;
@@ -170,13 +183,10 @@ class SocketConnection : public Connection {
   seastar::future<> close() override;
 
  public:
-  /// complete a handshake from the client's perspective
-  seastar::future<> start_connect(const entity_addr_t& peer_addr,
-                                  const entity_type_t& peer_type);
-
-  /// complete a handshake from the server's perspective
-  seastar::future<> start_accept(seastar::connected_socket&& socket,
-                                 const entity_addr_t& peer_addr);
+  void connect(const entity_addr_t& peer_addr,
+               const entity_type_t& peer_type);
+  seastar::future<> accept(seastar::connected_socket&& socket,
+                           const entity_addr_t& peer_addr);
 
   /// read a message from a connection that has completed its handshake
   seastar::future<MessageRef> read_message();
index 827267f238dccc4a7c83e51708f8bfbc3710792a..779f5fe2e5f17a95a2ba521a0a83e79316a98a48 100644 (file)
@@ -40,35 +40,6 @@ void SocketMessenger::bind(const entity_addr_t& addr)
   listener = seastar::listen(address, lo);
 }
 
-seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn)
-{
-  return seastar::keep_doing([=] {
-      return conn->read_message()
-        .then([=] (MessageRef msg) {
-          // start dispatch, ignoring exceptions from the application layer
-          seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] {
-              return dispatcher->ms_dispatch(conn, std::move(msg))
-                .handle_exception([] (std::exception_ptr eptr) {});
-            });
-          // return immediately to start on the next message
-          return seastar::now();
-        });
-    }).handle_exception_type([=] (const std::system_error& e) {
-      if (e.code() == error::connection_aborted ||
-          e.code() == error::connection_reset) {
-        return seastar::with_gate(pending_dispatch, [=] {
-            return dispatcher->ms_handle_reset(conn);
-          });
-      } else if (e.code() == error::read_eof) {
-        return seastar::with_gate(pending_dispatch, [=] {
-            return dispatcher->ms_handle_remote_reset(conn);
-          });
-      } else {
-        throw e;
-      }
-    });
-}
-
 seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
                                           seastar::socket_address paddr)
 {
@@ -76,23 +47,9 @@ seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
   entity_addr_t peer_addr;
   peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
   peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
+  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
   // initiate the handshake
-  return conn->start_accept(std::move(socket), peer_addr)
-    .then([this, conn] {
-      // notify the dispatcher and allow them to reject the connection
-      return seastar::with_gate(pending_dispatch, [=] {
-          return dispatcher->ms_handle_accept(conn);
-        });
-    }).handle_exception([conn] (std::exception_ptr eptr) {
-      // close the connection before returning errors
-      return seastar::make_exception_future<>(eptr)
-        .finally([conn] { return conn->close(); });
-    }).then([this, conn] {
-      // dispatch messages until the connection closes or the dispatch
-      // queue shuts down
-      return dispatch(std::move(conn));
-    });
+  return conn->accept(std::move(socket), peer_addr);
 }
 
 seastar::future<> SocketMessenger::start(Dispatcher *disp)
@@ -127,23 +84,8 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe
   if (auto found = lookup_conn(peer_addr); found) {
     return found;
   }
-  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
-  conn->start_connect(peer_addr, peer_type)
-    .then([this, conn] {
-      // notify the dispatcher and allow them to reject the connection
-      return seastar::with_gate(pending_dispatch, [this, conn] {
-        return dispatcher->ms_handle_connect(conn);
-      });
-    }).handle_exception([conn] (std::exception_ptr eptr) {
-      // close the connection before returning errors
-      return seastar::make_exception_future<>(eptr)
-        .finally([conn] { return conn->close(); });
-      // TODO: retry on fault
-    }).then([this, conn] {
-      // dispatch replies on this connection
-      dispatch(conn)
-        .handle_exception([] (std::exception_ptr eptr) {});
-    });
+  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
+  conn->connect(peer_addr, peer_type);
   return conn;
 }
 
index d2ef0b6456d1caef3e4fba99b0834e778c41d864..077f8a2716e8e3760219604385dee11f9743e565 100644 (file)
@@ -36,9 +36,6 @@ class SocketMessenger final : public Messenger {
   std::set<SocketConnectionRef> accepting_conns;
   using Throttle = ceph::thread::Throttle;
   ceph::net::PolicySet<Throttle> policy_set;
-  seastar::gate pending_dispatch;
-
-  seastar::future<> dispatch(SocketConnectionRef conn);
 
   seastar::future<> accept(seastar::connected_socket socket,
                            seastar::socket_address paddr);
@@ -65,6 +62,9 @@ class SocketMessenger final : public Messenger {
                 bool force_new) override;
 
  public:
+  // TODO: change to per-connection messenger gate
+  seastar::gate pending_dispatch;
+
   void set_default_policy(const SocketPolicy& p);
   void set_policy(entity_type_t peer_type, const SocketPolicy& p);
   void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);