]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: enable connections on all cores
authorYingxin Cheng <yingxincheng@gmail.com>
Tue, 12 Feb 2019 05:11:16 +0000 (13:11 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Tue, 12 Feb 2019 08:47:53 +0000 (16:47 +0800)
Implement the sharded crimson-messenger:

* Sharded Messenger: provides shared-nothing Messenger for each shard,
  it's interfaces are symmetric to be called, any modifications will be
  applied to all shards.

* Sharded/non-sharded Dispatcher interface: allow connections to be
  dispatched, and related resources (such as Session) to be managed in
  its own shard or not.

* Sharded Connection: A connection only lives at one dedicated core
  during its lifecycle. It's sharded by its peer_IP in this PoC, because
  peer port and nonce are not available when a socket is accepted. While
  its interfaces are safe to be called from all shards.

* Replace `boost::intrusive_ptr` by seastar native smart ptrs for
  `Connection` and `SocketConnection`, because they need to be
  destructed from its original core.

* Unit test: establish multiple connections on both client and server
  sides, they runs concurrently and creates sessions that are also
  following shared-nothing design.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
13 files changed:
src/crimson/CMakeLists.txt
src/crimson/net/Connection.h
src/crimson/net/Dispatcher.h
src/crimson/net/Fwd.h
src/crimson/net/Messenger.cc [new file with mode: 0644]
src/crimson/net/Messenger.h
src/crimson/net/Socket.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/test/crimson/CMakeLists.txt
src/test/crimson/test_messenger.cc

index da1ebb13d2a7264a3d85741c004cad00fc0e21f7..3294d634f6e46b6bfd032b96e2853ac471c25e31 100644 (file)
@@ -116,6 +116,7 @@ set(crimson_mon_srcs
 set(crimson_net_srcs
   net/Dispatcher.cc
   net/Errors.cc
+  net/Messenger.cc
   net/SocketConnection.cc
   net/SocketMessenger.cc
   net/Socket.cc)
@@ -124,7 +125,8 @@ set(crimson_thread_srcs
   thread/Throttle.cc)
 add_library(crimson STATIC
   ${crimson_auth_srcs}
-  ${crimson_mon_srcs}
+  # TODO: fix crimson_mon_client with the new design
+  # ${crimson_mon_srcs}
   ${crimson_net_srcs}
   ${crimson_thread_srcs}
   ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc)
index d8eb656fd7292058f3e7b2dbe7ebe526e649bcc4..b1b72c7463253217ef0231b749543f11a2fecd60 100644 (file)
@@ -15,8 +15,8 @@
 #pragma once
 
 #include <queue>
-#include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
 
 #include "Fwd.h"
 
@@ -24,8 +24,7 @@ namespace ceph::net {
 
 using seq_num_t = uint64_t;
 
-class Connection : public boost::intrusive_ref_counter<Connection,
-                                                      boost::thread_unsafe_counter> {
+class Connection : public seastar::enable_shared_from_this<Connection> {
  protected:
   entity_addr_t peer_addr;
   peer_type_t peer_type = -1;
@@ -39,7 +38,7 @@ class Connection : public boost::intrusive_ref_counter<Connection,
   virtual int get_peer_type() const = 0;
 
   /// true if the handshake has completed and no errors have been encountered
-  virtual bool is_connected() = 0;
+  virtual seastar::future<bool> is_connected() = 0;
 
   /// send a message over a connection that has completed its handshake
   virtual seastar::future<> send(MessageRef msg) = 0;
@@ -51,6 +50,9 @@ class Connection : public boost::intrusive_ref_counter<Connection,
   /// close the connection and cancel any any pending futures from read/send
   virtual seastar::future<> close() = 0;
 
+  /// which shard id the connection lives
+  virtual seastar::shard_id shard_id() const = 0;
+
   virtual void print(ostream& out) const = 0;
 };
 
index 94d6613e269ae404936b2bbb0e4ae0cf993ecee6..cbde15499286f4a01b07f130ee3cfbd37d7ed962 100644 (file)
@@ -15,6 +15,7 @@
 #pragma once
 
 #include <seastar/core/future.hh>
+#include <seastar/core/sharded.hh>
 
 #include "Fwd.h"
 
@@ -54,6 +55,11 @@ class Dispatcher {
   }
   virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
   ms_get_authorizer(peer_type_t);
+
+  // get the local dispatcher shard if it is accessed by another core
+  virtual Dispatcher* get_local_shard() {
+    return this;
+  }
 };
 
 } // namespace ceph::net
index 5aa04812d6021473c8b05a21b668a0c8d313de69..8a0a1c96f22c8fba57f64ec039ad5429f56260e3 100644 (file)
@@ -14,7 +14,8 @@
 
 #pragma once
 
-#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/sharded.hh>
 
 #include "msg/msg_types.h"
 #include "msg/Message.h"
@@ -27,10 +28,25 @@ namespace ceph::net {
 using msgr_tag_t = uint8_t;
 
 class Connection;
-using ConnectionRef = boost::intrusive_ptr<Connection>;
+using ConnectionRef = seastar::shared_ptr<Connection>;
+// NOTE: ConnectionXRef should only be used in seastar world, because
+// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads.
+using ConnectionXRef = seastar::lw_shared_ptr<seastar::foreign_ptr<ConnectionRef>>;
 
 class Dispatcher;
 
 class Messenger;
 
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+  auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+  return sharded_obj->start(args...).then([sharded_obj]() {
+      auto ret = &sharded_obj->local();
+      seastar::engine().at_exit([sharded_obj]() {
+          return sharded_obj->stop().finally([sharded_obj] {});
+        });
+      return ret;
+    });
+}
+
 } // namespace ceph::net
diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc
new file mode 100644 (file)
index 0000000..bd6c48b
--- /dev/null
@@ -0,0 +1,20 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Messenger.h"
+#include "SocketMessenger.h"
+
+namespace ceph::net {
+
+seastar::future<Messenger*>
+Messenger::create(const entity_name_t& name,
+                  const std::string& lname,
+                  const uint64_t nonce)
+{
+  return create_sharded<SocketMessenger>(name, lname, nonce)
+    .then([](Messenger *msgr) {
+      return msgr;
+    });
+}
+
+} // namespace ceph::net
index 6b77e3f5718ce1c042cdb7bcad82487defa31499..a8669f4fb0be6a6e5a7dce299aee3adbdfd2d095 100644 (file)
@@ -37,24 +37,29 @@ class Messenger {
   const entity_name_t& get_myname() const { return my_name; }
   const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
   entity_addr_t get_myaddr() const { return my_addrs.front(); }
-  virtual void set_myaddrs(const entity_addrvec_t& addrs) {
+  virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) {
     my_addrs = addrs;
+    return seastar::now();
   }
 
   /// bind to the given address
-  virtual void bind(const entity_addrvec_t& addr) = 0;
+  virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0;
 
   /// try to bind to the first unused port of given address
-  virtual void try_bind(const entity_addrvec_t& addr,
-                       uint32_t min_port, uint32_t max_port) = 0;
+  virtual seastar::future<> try_bind(const entity_addrvec_t& addr,
+                                     uint32_t min_port, uint32_t max_port) = 0;
 
   /// start the messenger
   virtual seastar::future<> start(Dispatcher *dispatcher) = 0;
 
   /// either return an existing connection to the peer,
   /// or a new pending connection
-  virtual ConnectionRef connect(const entity_addr_t& peer_addr,
-                                const entity_type_t& peer_type) = 0;
+  virtual seastar::future<ConnectionXRef>
+  connect(const entity_addr_t& peer_addr,
+          const entity_type_t& peer_type) = 0;
+
+  // wait for messenger shutdown
+  virtual seastar::future<> wait() = 0;
 
   /// stop listenening and wait for all connections to close. safe to destruct
   /// after this future becomes available
@@ -77,7 +82,15 @@ class Messenger {
     crc_flags |= MSG_CRC_HEADER;
   }
 
+  // get the local messenger shard if it is accessed by another core
+  virtual Messenger* get_local_shard() {
+    return this;
+  }
+
   virtual void print(ostream& out) const = 0;
+
+  static seastar::future<Messenger*>
+  create(const entity_name_t& name, const std::string& lname, const uint64_t nonce);
 };
 
 inline ostream& operator<<(ostream& out, const Messenger& msgr) {
index 3c0411355088e40eaacaef849ce3c60daefdb3a2..c1a2ed59a4ce34d103c36aff7799f5fe1790c673 100644 (file)
@@ -12,6 +12,7 @@ namespace ceph::net {
 
 class Socket
 {
+  const seastar::shard_id sid;
   seastar::connected_socket socket;
   seastar::input_stream<char> in;
   seastar::output_stream<char> out;
@@ -24,10 +25,11 @@ class Socket
 
  public:
   explicit Socket(seastar::connected_socket&& _socket)
-    : socket(std::move(_socket)),
+    : sid{seastar::engine().cpu_id()},
+      socket(std::move(_socket)),
       in(socket.input()),
       out(socket.output()) {}
-  Socket(Socket&& o) = default;
+  Socket(Socket&& o) = delete;
 
   /// read the requested number of bytes into a bufferlist
   seastar::future<bufferlist> read(size_t bytes);
@@ -47,7 +49,10 @@ class Socket
 
   /// Socket can only be closed once.
   seastar::future<> close() {
-    return seastar::when_all(in.close(), out.close()).discard_result();
+    return seastar::smp::submit_to(sid, [this] {
+        return seastar::when_all(
+          in.close(), out.close()).discard_result();
+      });
   }
 };
 
index 6e2d1a023072beec37009db56ccf5c4a283e9541..1cc8963bd4bd0f4dae1e8cbecd99068a81aca884 100644 (file)
@@ -49,6 +49,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
     dispatcher(dispatcher),
     send_ready(h.promise.get_future())
 {
+  ceph_assert(&messenger.container().local() == &messenger);
 }
 
 SocketConnection::~SocketConnection()
@@ -61,37 +62,50 @@ SocketConnection::get_messenger() const {
   return &messenger;
 }
 
-bool SocketConnection::is_connected()
+seastar::future<bool> SocketConnection::is_connected()
 {
-  return !send_ready.failed();
+  return seastar::smp::submit_to(shard_id(), [this] {
+      return !send_ready.failed();
+    });
 }
 
 seastar::future<> SocketConnection::send(MessageRef msg)
 {
-  if (state == state_t::closing)
-    return seastar::now();
-  return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
-      return do_send(std::move(msg))
-        .handle_exception([this] (std::exception_ptr eptr) {
-          logger().warn("{} send fault: {}", *this, eptr);
-          close();
+  return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
+      if (state == state_t::closing)
+        return seastar::now();
+      return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
+          return do_send(std::move(msg))
+            .handle_exception([this] (std::exception_ptr eptr) {
+              logger().warn("{} send fault: {}", *this, eptr);
+              close();
+            });
         });
     });
 }
 
 seastar::future<> SocketConnection::keepalive()
 {
-  if (state == state_t::closing)
-    return seastar::now();
-  return seastar::with_gate(pending_dispatch, [this] {
-      return do_keepalive()
-        .handle_exception([this] (std::exception_ptr eptr) {
-          logger().warn("{} keepalive fault: {}", *this, eptr);
-          close();
+  return seastar::smp::submit_to(shard_id(), [this] {
+      if (state == state_t::closing)
+        return seastar::now();
+      return seastar::with_gate(pending_dispatch, [this] {
+          return do_keepalive()
+            .handle_exception([this] (std::exception_ptr eptr) {
+              logger().warn("{} keepalive fault: {}", *this, eptr);
+              close();
+            });
         });
     });
 }
 
+seastar::future<> SocketConnection::close()
+{
+  return seastar::smp::submit_to(shard_id(), [this] {
+      return do_close();
+    });
+}
+
 seastar::future<> SocketConnection::handle_tags()
 {
   return seastar::keep_doing([this] {
@@ -196,10 +210,13 @@ seastar::future<> SocketConnection::read_message()
       }
 
       constexpr bool add_ref = false; // Message starts with 1 ref
+      // TODO: change MessageRef with foreign_ptr
       auto msg_ref = MessageRef{msg, add_ref};
       // start dispatch, ignoring exceptions from the application layer
       seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
-          return dispatcher.ms_dispatch(this, std::move(msg))
+          return dispatcher.ms_dispatch(
+              seastar::static_pointer_cast<SocketConnection>(shared_from_this()),
+              std::move(msg))
             .handle_exception([this] (std::exception_ptr eptr) {
               logger().error("{} ms_dispatch caught exception: {}", *this, eptr);
               ceph_assert(false);
@@ -298,7 +315,7 @@ seastar::future<> SocketConnection::do_keepalive()
   return f.get_future();
 }
 
-seastar::future<> SocketConnection::close()
+seastar::future<> SocketConnection::do_close()
 {
   if (state == state_t::closing) {
     // already closing
@@ -307,12 +324,14 @@ seastar::future<> SocketConnection::close()
   }
 
   // unregister_conn() drops a reference, so hold another until completion
-  auto cleanup = [conn = SocketConnectionRef(this)] {};
+  auto cleanup = [conn_ref = shared_from_this(), this] {
+      logger().debug("{} closed!", *this);
+    };
 
   if (state == state_t::accepting) {
-    messenger.unaccept_conn(this);
+    messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
   } else if (state >= state_t::connecting && state < state_t::closing) {
-    messenger.unregister_conn(this);
+    messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
   } else {
     // cannot happen
     ceph_assert(false);
@@ -785,7 +804,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
   ceph_assert(!socket);
   peer_addr = _peer_addr;
   peer_type = _peer_type;
-  messenger.register_conn(this);
+  messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
   logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
   state = state_t::connecting;
   seastar::with_gate(pending_dispatch, [this] {
@@ -796,7 +815,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
             fd.shutdown_output();
             throw std::system_error(make_error_code(error::connection_aborted));
           }
-          socket.emplace(std::move(fd));
+          socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
           // read server's handshake header
           return socket->read(server_header_size);
         }).then([this] (bufferlist headerbl) {
@@ -810,8 +829,8 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
 
           side = side_t::connector;
           socket_port = caddr.get_port();
-          messenger.learned_addr(caddr);
-
+          return messenger.learned_addr(caddr);
+        }).then([this] {
           // encode/send client's handshake header
           bufferlist bl;
           bl.append(buffer::create_static(banner_size, banner));
@@ -824,7 +843,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
           });
         }).then([this] {
           // notify the dispatcher and allow them to reject the connection
-          return dispatcher.ms_handle_connect(this);
+          return dispatcher.ms_handle_connect(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
         }).then([this] {
           execute_open();
         }).handle_exception([this] (std::exception_ptr eptr) {
@@ -837,7 +856,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
 }
 
 void
-SocketConnection::start_accept(seastar::connected_socket&& fd,
+SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& sock,
                                const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::none);
@@ -846,8 +865,8 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
   peer_addr.set_port(0);
   side = side_t::acceptor;
   socket_port = _peer_addr.get_port();
-  socket.emplace(std::move(fd));
-  messenger.accept_conn(this);
+  socket = std::move(sock);
+  messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
   logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
   state = state_t::accepting;
   seastar::with_gate(pending_dispatch, [this, _peer_addr] {
@@ -874,10 +893,10 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
           });
         }).then([this] {
           // notify the dispatcher and allow them to reject the connection
-          return dispatcher.ms_handle_accept(this);
+          return dispatcher.ms_handle_accept(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
         }).then([this] {
-          messenger.register_conn(this);
-          messenger.unaccept_conn(this);
+          messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+          messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
           execute_open();
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the accepting state
@@ -902,12 +921,12 @@ SocketConnection::execute_open()
           logger().warn("{} open fault: {}", *this, e);
           if (e.code() == error::connection_aborted ||
               e.code() == error::connection_reset) {
-            return dispatcher.ms_handle_reset(this)
+            return dispatcher.ms_handle_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
               .then([this] {
                 close();
               });
           } else if (e.code() == error::read_eof) {
-            return dispatcher.ms_handle_remote_reset(this)
+            return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
               .then([this] {
                 close();
               });
@@ -925,7 +944,7 @@ SocketConnection::execute_open()
 seastar::future<> SocketConnection::fault()
 {
   if (policy.lossy) {
-    messenger.unregister_conn(this);
+    messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
   }
   if (h.backoff.count()) {
     h.backoff += h.backoff;
@@ -938,6 +957,10 @@ seastar::future<> SocketConnection::fault()
   return seastar::sleep(h.backoff);
 }
 
+seastar::shard_id SocketConnection::shard_id() const {
+  return messenger.shard_id();
+}
+
 void SocketConnection::print(ostream& out) const {
     messenger.print(out);
     if (side == side_t::none) {
index 6d6ea51762759a1cd632deef8f0248f6833edf2a..62cc77d534776617a674e108ecce01e88ada0a2e 100644 (file)
@@ -17,6 +17,7 @@
 #include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/shared_future.hh>
+#include <seastar/core/sharded.hh>
 
 #include "msg/Policy.h"
 #include "Connection.h"
@@ -32,11 +33,11 @@ using stop_t = seastar::stop_iteration;
 
 class SocketMessenger;
 class SocketConnection;
-using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
+using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
 
 class SocketConnection : public Connection {
   SocketMessenger& messenger;
-  std::optional<Socket> socket;
+  seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
   Dispatcher& dispatcher;
   seastar::gate pending_dispatch;
 
@@ -162,6 +163,7 @@ class SocketConnection : public Connection {
 
   seastar::future<> do_send(MessageRef msg);
   seastar::future<> do_keepalive();
+  seastar::future<> do_close();
 
  public:
   SocketConnection(SocketMessenger& messenger,
@@ -174,7 +176,7 @@ class SocketConnection : public Connection {
     return peer_type;
   }
 
-  bool is_connected() override;
+  seastar::future<bool> is_connected() override;
 
   seastar::future<> send(MessageRef msg) override;
 
@@ -182,6 +184,8 @@ class SocketConnection : public Connection {
 
   seastar::future<> close() override;
 
+  seastar::shard_id shard_id() const override;
+
   void print(ostream& out) const override;
 
  public:
@@ -191,7 +195,7 @@ class SocketConnection : public Connection {
                      const entity_type_t& peer_type);
   /// start a handshake from the server's perspective,
   /// only call when SocketConnection first construct
-  void start_accept(seastar::connected_socket&& socket,
+  void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
                     const entity_addr_t& peer_addr);
 
   /// the number of connections initiated in this session, increment when a
index c0ad81cd3b68fb4fc9cbd8fc8bc7349f713adf71..c49729e8c37425122c46c5b37e2efee03a9f4a97 100644 (file)
 #include "SocketMessenger.h"
 
 #include <tuple>
+#include <boost/functional/hash.hpp>
 
 #include "auth/Auth.h"
 #include "Errors.h"
 #include "Dispatcher.h"
+#include "Socket.h"
 
 using namespace ceph::net;
 
@@ -31,59 +33,106 @@ namespace {
 SocketMessenger::SocketMessenger(const entity_name_t& myname,
                                  const std::string& logic_name,
                                  uint32_t nonce)
-  : Messenger{myname}, logic_name{logic_name}, nonce{nonce}
+  : Messenger{myname},
+    sid{seastar::engine().cpu_id()},
+    logic_name{logic_name},
+    nonce{nonce}
 {}
 
-void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
+seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
 {
   auto my_addrs = addrs;
   for (auto& addr : my_addrs.v) {
     addr.nonce = nonce;
   }
-  // TODO: propagate to all the cores of the Messenger
-  Messenger::set_myaddrs(my_addrs);
+  return container().invoke_on_all([my_addrs](auto& msgr) {
+      return msgr.Messenger::set_myaddrs(my_addrs);
+    });
 }
 
-void SocketMessenger::bind(const entity_addrvec_t& addrs)
+seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
 {
-  // TODO: v2: listen on multiple addresses
-  auto addr = addrs.legacy_addr();
-
-  if (addr.get_family() != AF_INET) {
-    throw std::system_error(EAFNOSUPPORT, std::generic_category());
+  ceph_assert(addrs.legacy_addr().get_family() == AF_INET);
+  auto my_addrs = addrs;
+  for (auto& addr : my_addrs.v) {
+    addr.nonce = nonce;
   }
-
-  set_myaddrs(addrs);
-  logger().info("listening on {}", addr);
-  seastar::socket_address address(addr.in4_addr());
-  seastar::listen_options lo;
-  lo.reuse_address = true;
-  listener = seastar::listen(address, lo);
+  logger().info("listening on {}", my_addrs.legacy_addr().in4_addr());
+  return container().invoke_on_all([my_addrs](auto& msgr) {
+      msgr.do_bind(my_addrs);
+    });
 }
 
-void SocketMessenger::try_bind(const entity_addrvec_t& addrs,
-                              uint32_t min_port, uint32_t max_port)
+seastar::future<>
+SocketMessenger::try_bind(const entity_addrvec_t& addrs,
+                          uint32_t min_port, uint32_t max_port)
 {
   auto addr = addrs.legacy_or_front_addr();
   if (addr.get_port() != 0) {
     return bind(addrs);
   }
-  for (auto port = min_port; port <= max_port; port++) {
-    try {
-      addr.set_port(port);
-      bind(entity_addrvec_t{addr});
-      logger().info("{}: try_bind: done", *this);
-      return;
-    } catch (const std::system_error& e) {
-      logger().debug("{}: try_bind: {} already used", *this, port);
-      if (port == max_port) {
-       throw;
-      }
-    }
-  }
+  ceph_assert(min_port <= max_port);
+  return seastar::do_with(uint32_t(min_port),
+    [this, max_port, addr] (auto& port) {
+      return seastar::repeat([this, max_port, addr, &port] {
+          auto to_bind = addr;
+          to_bind.set_port(port);
+          return bind(entity_addrvec_t{to_bind})
+            .then([this] {
+              logger().info("{}: try_bind: done", *this);
+              return stop_t::yes;
+            }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
+              logger().debug("{}: try_bind: {} already used", *this, port);
+              if (port == max_port) {
+                throw e;
+              }
+              ++port;
+              return stop_t::no;
+            });
+        });
+    });
+}
+
+seastar::future<> SocketMessenger::start(Dispatcher *disp) {
+  return container().invoke_on_all([disp](auto& msgr) {
+      return msgr.do_start(disp->get_local_shard());
+    });
+}
+
+seastar::future<ceph::net::ConnectionXRef>
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+{
+  auto shard = locate_shard(peer_addr);
+  return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
+      return msgr.do_connect(peer_addr, peer_type);
+    }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) {
+      return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn));
+    });
 }
 
-seastar::future<> SocketMessenger::start(Dispatcher *disp)
+seastar::future<> SocketMessenger::shutdown()
+{
+  return container().invoke_on_all([](auto& msgr) {
+      return msgr.do_shutdown();
+    }).finally([this] {
+      return container().invoke_on_all([](auto& msgr) {
+          msgr.shutdown_promise.set_value();
+        });
+    });
+}
+
+void SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+{
+  Messenger::set_myaddrs(addrs);
+
+  // TODO: v2: listen on multiple addresses
+  seastar::socket_address address(addrs.legacy_addr().in4_addr());
+  seastar::listen_options lo;
+  lo.reuse_address = true;
+  listener = seastar::listen(address, lo);
+}
+
+seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
 {
   dispatcher = disp;
 
@@ -96,9 +145,15 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
             // allocate the connection
             entity_addr_t peer_addr;
             peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-            SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
+            auto shard = locate_shard(peer_addr);
+#warning fixme
+            // we currently do dangerous i/o from a Connection core, different from the Socket core.
+            auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket)));
             // don't wait before accepting another
-            conn->start_accept(std::move(socket), peer_addr);
+            container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable {
+                SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher);
+                conn->start_accept(std::move(sock), peer_addr);
+              });
           });
       }).handle_exception_type([this] (const std::system_error& e) {
         // stop gracefully on connection_aborted
@@ -111,18 +166,18 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
   return seastar::now();
 }
 
-ceph::net::ConnectionRef
-SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+seastar::foreign_ptr<ceph::net::ConnectionRef>
+SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
 {
   if (auto found = lookup_conn(peer_addr); found) {
-    return found;
+    return seastar::make_foreign(found->shared_from_this());
   }
-  SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
+  SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher);
   conn->start_connect(peer_addr, peer_type);
-  return conn;
+  return seastar::make_foreign(conn->shared_from_this());
 }
 
-seastar::future<> SocketMessenger::shutdown()
+seastar::future<> SocketMessenger::do_shutdown()
 {
   if (listener) {
     listener->abort_accept();
@@ -140,11 +195,11 @@ seastar::future<> SocketMessenger::shutdown()
     });
 }
 
-void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
 {
   if (!get_myaddr().is_blank_ip()) {
     // already learned or binded
-    return;
+    return seastar::now();
   }
 
   // Only learn IP address if blank.
@@ -152,7 +207,7 @@ void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
   addr.u = peer_addr_for_me.u;
   addr.set_type(peer_addr_for_me.get_type());
   addr.set_port(get_myaddr().get_port());
-  set_myaddrs(entity_addrvec_t{addr});
+  return set_myaddrs(entity_addrvec_t{addr});
 }
 
 void SocketMessenger::set_default_policy(const SocketPolicy& p)
@@ -173,6 +228,16 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
   policy_set.set_throttlers(peer_type, throttle, nullptr);
 }
 
+seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr)
+{
+  ceph_assert(addr.get_family() == AF_INET);
+  std::size_t seed = 0;
+  boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr);
+  //boost::hash_combine(seed, addr.u.sin.sin_port);
+  //boost::hash_combine(seed, addr.nonce);
+  return seed % seastar::smp::count;
+}
+
 ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
 {
   if (auto found = connections.find(addr);
index 0fb5d9fd7dd0a8f03d4b15c55911b684abef6374..20b0ad56e93f0a7e84355c56f79f9f94d5cbb51a 100644 (file)
@@ -19,6 +19,7 @@
 #include <set>
 #include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
 
 #include "msg/Policy.h"
 #include "Messenger.h"
@@ -29,7 +30,10 @@ namespace ceph::net {
 
 using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;
 
-class SocketMessenger final : public Messenger {
+class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
+  const seastar::shard_id sid;
+  seastar::promise<> shutdown_promise;
+
   std::optional<seastar::server_socket> listener;
   Dispatcher *dispatcher = nullptr;
   std::map<entity_addr_t, SocketConnectionRef> connections;
@@ -43,25 +47,46 @@ class SocketMessenger final : public Messenger {
   seastar::future<> accept(seastar::connected_socket socket,
                            seastar::socket_address paddr);
 
+  void do_bind(const entity_addrvec_t& addr);
+  seastar::future<> do_start(Dispatcher *disp);
+  seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
+                                                 const entity_type_t& peer_type);
+  seastar::future<> do_shutdown();
+  // conn sharding options:
+  // 1. Simplest: sharded by ip only
+  // 2. Balanced: sharded by ip + port + nonce,
+  //        but, need to move SocketConnection between cores.
+  seastar::shard_id locate_shard(const entity_addr_t& addr);
+
  public:
   SocketMessenger(const entity_name_t& myname,
                   const std::string& logic_name,
                   uint32_t nonce);
 
-  void set_myaddrs(const entity_addrvec_t& addr) override;
+  seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
 
-  void bind(const entity_addrvec_t& addr) override;
+  // Messenger interfaces are assumed to be called from its own shard, but its
+  // behavior should be symmetric when called from any shard.
+  seastar::future<> bind(const entity_addrvec_t& addr) override;
 
-  void try_bind(const entity_addrvec_t& addr,
-               uint32_t min_port, uint32_t max_port) override;
+  seastar::future<> try_bind(const entity_addrvec_t& addr,
+                             uint32_t min_port, uint32_t max_port) override;
 
   seastar::future<> start(Dispatcher *dispatcher) override;
 
-  ConnectionRef connect(const entity_addr_t& peer_addr,
-                        const entity_type_t& peer_type) override;
+  seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
+                                          const entity_type_t& peer_type) override;
+  // can only wait once
+  seastar::future<> wait() override {
+    return shutdown_promise.get_future();
+  }
 
   seastar::future<> shutdown() override;
 
+  Messenger* get_local_shard() override {
+    return &container().local();
+  }
+
   void print(ostream& out) const override {
     out << get_myname()
         << "(" << logic_name
@@ -69,7 +94,7 @@ class SocketMessenger final : public Messenger {
   }
 
  public:
-  void learned_addr(const entity_addr_t &peer_addr_for_me);
+  seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
   void set_default_policy(const SocketPolicy& p);
   void set_policy(entity_type_t peer_type, const SocketPolicy& p);
   void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
@@ -79,6 +104,15 @@ class SocketMessenger final : public Messenger {
   void unaccept_conn(SocketConnectionRef);
   void register_conn(SocketConnectionRef);
   void unregister_conn(SocketConnectionRef);
+
+  // required by sharded<>
+  seastar::future<> stop() {
+    return seastar::make_ready_future<>();
+  }
+
+  seastar::shard_id shard_id() const {
+    return sid;
+  }
 };
 
 } // namespace ceph::net
index 7ce959c044f8062f000b5ceb14d959c1ad1b83cc..d06fd341bd1446875627a93553c766b53e8a7307 100644 (file)
@@ -9,12 +9,13 @@ add_ceph_unittest(unittest_seastar_denc)
 target_link_libraries(unittest_seastar_denc crimson GTest::Main)
 
 add_executable(unittest_seastar_messenger test_messenger.cc)
-#add_ceph_unittest(unittest_seastar_messenger)
+add_ceph_unittest(unittest_seastar_messenger)
 target_link_libraries(unittest_seastar_messenger ceph-common crimson)
 
-add_executable(unittest_seastar_echo
-  test_alien_echo.cc)
-target_link_libraries(unittest_seastar_echo ceph-common global crimson)
+# TODO: fix unittest_seastar_echo with the new design
+#add_executable(unittest_seastar_echo
+#  test_alien_echo.cc)
+#target_link_libraries(unittest_seastar_echo ceph-common global crimson)
 
 add_executable(unittest_seastar_thread_pool
   test_thread_pool.cc)
@@ -25,9 +26,10 @@ add_executable(unittest_seastar_config
   test_config.cc)
 target_link_libraries(unittest_seastar_config crimson)
 
-add_executable(unittest_seastar_monc
-  test_monc.cc)
-target_link_libraries(unittest_seastar_monc crimson)
+# TODO: fix unittest_seastar_monc with the new design
+#add_executable(unittest_seastar_monc
+#  test_monc.cc)
+#target_link_libraries(unittest_seastar_monc crimson)
 
 add_executable(unittest_seastar_perfcounters
   test_perfcounters.cc)
index 9b6cede47ab75ae7e5b4422f3f871035f0517175..d13dc9f6782fc390e23b8072ebeaed8845d44179 100644 (file)
@@ -1,16 +1,26 @@
 #include "messages/MPing.h"
+#include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Dispatcher.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
 
+#include <map>
 #include <random>
 #include <boost/program_options.hpp>
 #include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
 #include <seastar/core/future-util.hh>
 #include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
 
 namespace bpo = boost::program_options;
 
+namespace {
+
+seastar::logger& logger() {
+  return ceph::get_logger(ceph_subsys_ms);
+}
+
 static std::random_device rd;
 static std::default_random_engine rng{rd()};
 static bool verbose = false;
@@ -19,106 +29,226 @@ static seastar::future<> test_echo(unsigned rounds,
                                    double keepalive_ratio)
 {
   struct test_state {
-    entity_addr_t addr;
-
-    struct {
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1", 1};
-      struct ServerDispatcher : ceph::net::Dispatcher {
-        seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
-                                      MessageRef m) override {
-          if (verbose) {
-            std::cout << "server got " << *m << std::endl;
-          }
-          // reply with a pong
-          return c->send(MessageRef{new MPing(), false});
+    struct Server final
+        : public ceph::net::Dispatcher,
+          public seastar::peering_sharded_service<Server> {
+      ceph::net::Messenger *msgr = nullptr;
+
+      Dispatcher* get_local_shard() override {
+        return &(container().local());
+      }
+      seastar::future<> stop() {
+        return seastar::make_ready_future<>();
+      }
+      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+                                    MessageRef m) override {
+        if (verbose) {
+          logger().info("server got {}", *m);
         }
-      } dispatcher;
-    } server;
+        // reply with a pong
+        return c->send(MessageRef{new MPing(), false});
+      }
 
-    struct {
-      unsigned rounds;
-      std::bernoulli_distribution keepalive_dist{};
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1", 2};
-      struct ClientDispatcher : ceph::net::Dispatcher {
-        seastar::promise<MessageRef> reply;
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce,
+                             const entity_addr_t& addr) {
+        auto&& fut = ceph::net::Messenger::create(name, lname, nonce);
+        return fut.then([this, addr](ceph::net::Messenger *messenger) {
+            return container().invoke_on_all([messenger](auto& server) {
+                server.msgr = messenger->get_local_shard();
+              }).then([messenger, addr] {
+                return messenger->bind(entity_addrvec_t{addr});
+              }).then([this, messenger] {
+                return messenger->start(this);
+              });
+          });
+      }
+      seastar::future<> shutdown() {
+        ceph_assert(msgr);
+        return msgr->shutdown();
+      }
+    };
+
+    struct Client final
+        : public ceph::net::Dispatcher,
+          public seastar::peering_sharded_service<Client> {
+
+      struct PingSession : public seastar::enable_shared_from_this<PingSession> {
         unsigned count = 0u;
-        seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
-                                      MessageRef m) override {
-          ++count;
-          if (verbose) {
-            std::cout << "client ms_dispatch " << count << std::endl;
-          }
-          reply.set_value(std::move(m));
+      };
+      using PingSessionRef = seastar::shared_ptr<PingSession>;
+
+      unsigned rounds;
+      std::bernoulli_distribution keepalive_dist;
+      ceph::net::Messenger *msgr = nullptr;
+      std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
+      std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+
+      Client(unsigned rounds, double keepalive_ratio)
+        : rounds(rounds),
+          keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
+      Dispatcher* get_local_shard() override {
+        return &(container().local());
+      }
+      seastar::future<> stop() {
+        return seastar::now();
+      }
+      seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
+        logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
+        auto session = seastar::make_shared<PingSession>();
+        auto [i, added] = sessions.emplace(conn, session);
+        std::ignore = i;
+        ceph_assert(added);
+        return container().invoke_on_all([conn = conn.get()](auto& client) {
+            auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
+            std::ignore = i;
+            ceph_assert(added);
+          });
+      }
+      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+                                    MessageRef m) override {
+        auto found = sessions.find(c);
+        if (found == sessions.end()) {
+          ceph_assert(false);
+        }
+        auto session = found->second;
+        ++(session->count);
+        if (verbose) {
+          logger().info("client ms_dispatch {}", session->count);
+        }
+
+        if (session->count == rounds) {
+          logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+          return container().invoke_on_all([conn = c.get()](auto &client) {
+              auto found = client.pending_conns.find(conn);
+              ceph_assert(found != client.pending_conns.end());
+              found->second.set_value();
+            });
+        } else {
           return seastar::now();
         }
-      } dispatcher;
-      seastar::future<> pingpong(ceph::net::ConnectionRef c) {
-        return seastar::repeat([conn=std::move(c), this] {
-          if (keepalive_dist(rng)) {
-            return conn->keepalive().then([] {
-              return seastar::make_ready_future<seastar::stop_iteration>(
-                seastar::stop_iteration::no);
+      }
+
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce) {
+        return ceph::net::Messenger::create(name, lname, nonce)
+          .then([this](ceph::net::Messenger *messenger) {
+            return container().invoke_on_all([messenger](auto& client) {
+                client.msgr = messenger->get_local_shard();
+              }).then([this, messenger] {
+                return messenger->start(this);
               });
-          } else {
-            return conn->send(MessageRef{new MPing(), false}).then([&] {
-              return dispatcher.reply.get_future();
-            }).then([&] (MessageRef msg) {
-              dispatcher.reply = seastar::promise<MessageRef>{};
-              if (verbose) {
-                std::cout << "client got reply " << *msg << std::endl;
-              }
-              return seastar::make_ready_future<seastar::stop_iteration>(
-                  seastar::stop_iteration::yes);
-            });
-          };
-        });
+          });
       }
-      bool done() const {
-        return dispatcher.count >= rounds;
+
+      seastar::future<> shutdown() {
+        ceph_assert(msgr);
+        return msgr->shutdown();
       }
-    } client;
-  };
-  return seastar::do_with(test_state{},
-    [rounds, keepalive_ratio] (test_state& t) {
-      // bind the server
-      t.addr.set_type(entity_addr_t::TYPE_LEGACY);
-      t.addr.set_family(AF_INET);
-      t.addr.set_port(9010);
-      t.addr.set_nonce(1);
-      t.server.messenger.bind(entity_addrvec_t{t.addr});
-
-      t.client.rounds = rounds;
-      t.client.keepalive_dist = std::bernoulli_distribution{keepalive_ratio};
-
-      return t.server.messenger.start(&t.server.dispatcher)
-        .then([&] {
-          return t.client.messenger.start(&t.client.dispatcher)
-            .then([&] {
-              return t.client.messenger.connect(t.addr,
-                                                entity_name_t::TYPE_OSD);
-            }).then([&client=t.client] (ceph::net::ConnectionRef conn) {
-              if (verbose) {
-                std::cout << "client connected" << std::endl;
-              }
-              return seastar::repeat([&client,conn=std::move(conn)] {
-                return client.pingpong(conn).then([&client] {
-                  return seastar::make_ready_future<seastar::stop_iteration>(
-                    client.done() ?
-                    seastar::stop_iteration::yes :
-                    seastar::stop_iteration::no);
-                });
+
+      seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
+        return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
+          .then([this, foreign_dispatch](auto conn) {
+            if (foreign_dispatch) {
+              return do_dispatch_pingpong(&**conn)
+                .finally([this, conn] {});
+            } else {
+              // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
+              return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
+                  return client.do_dispatch_pingpong(conn);
+                }).finally([this, conn] {});
+            }
+          });
+      }
+
+     private:
+      seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) {
+        return seastar::do_with(0u, 0u,
+                                [this, conn](auto &count_ping, auto &count_keepalive) {
+            return seastar::do_until(
+              [this, conn, &count_ping, &count_keepalive] {
+                bool stop = (count_ping == rounds);
+                if (stop) {
+                  logger().info("{}: finished sending {} pings with {} keepalives",
+                                *conn, count_ping, count_keepalive);
+                }
+                return stop;
+              },
+              [this, conn, &count_ping, &count_keepalive] {
+                return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+                    if (keepalive_dist(rng)) {
+                      count_keepalive += 1;
+                      return conn->keepalive()
+                        .then([&count_keepalive] {
+                          return seastar::make_ready_future<seastar::stop_iteration>(
+                            seastar::stop_iteration::no);
+                        });
+                    } else {
+                      count_ping += 1;
+                      return conn->send(MessageRef{new MPing(), false})
+                        .then([] {
+                          return seastar::make_ready_future<seastar::stop_iteration>(
+                            seastar::stop_iteration::yes);
+                        });
+                    }
+                  });
+              }).then([this, conn] {
+                auto found = pending_conns.find(conn);
+                if (found == pending_conns.end())
+                  throw std::runtime_error{"Not connected."};
+                return found->second.get_future();
               });
-            }).finally([&] {
-              if (verbose) {
-                std::cout << "client shutting down" << std::endl;
-              }
-              return t.client.messenger.shutdown();
-            });
-        }).finally([&] {
-          if (verbose) {
-            std::cout << "server shutting down" << std::endl;
-          }
-          return t.server.messenger.shutdown();
+          });
+      }
+    };
+  };
+
+  logger().info("test_echo():");
+  return seastar::when_all_succeed(
+      ceph::net::create_sharded<test_state::Server>(),
+      ceph::net::create_sharded<test_state::Server>(),
+      ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
+      ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
+    .then([rounds, keepalive_ratio](test_state::Server *server1,
+                                    test_state::Server *server2,
+                                    test_state::Client *client1,
+                                    test_state::Client *client2) {
+      // start servers and clients
+      entity_addr_t addr1;
+      addr1.parse("127.0.0.1:9010", nullptr);
+      addr1.set_type(entity_addr_t::TYPE_LEGACY);
+      entity_addr_t addr2;
+      addr2.parse("127.0.0.1:9011", nullptr);
+      addr2.set_type(entity_addr_t::TYPE_LEGACY);
+      return seastar::when_all_succeed(
+          server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+          server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+          client1->init(entity_name_t::OSD(2), "client1", 3),
+          client2->init(entity_name_t::OSD(3), "client2", 4))
+      // dispatch pingpoing
+        .then([client1, client2, server1, server2] {
+          return seastar::when_all_succeed(
+              // test connecting in parallel, accepting in parallel,
+              // and operating the connection reference from a foreign/local core
+              client1->dispatch_pingpong(server1->msgr->get_myaddr(), true),
+              client1->dispatch_pingpong(server2->msgr->get_myaddr(), false),
+              client2->dispatch_pingpong(server1->msgr->get_myaddr(), false),
+              client2->dispatch_pingpong(server2->msgr->get_myaddr(), true));
+      // shutdown
+        }).finally([client1] {
+          logger().info("client1 shutdown...");
+          return client1->shutdown();
+        }).finally([client2] {
+          logger().info("client2 shutdown...");
+          return client2->shutdown();
+        }).finally([server1] {
+          logger().info("server1 shutdown...");
+          return server1->shutdown();
+        }).finally([server2] {
+          logger().info("server2 shutdown...");
+          return server2->shutdown();
         });
     });
 }
@@ -126,68 +256,120 @@ static seastar::future<> test_echo(unsigned rounds,
 static seastar::future<> test_concurrent_dispatch()
 {
   struct test_state {
-    entity_addr_t addr;
-
-    struct {
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2", 3};
-      class ServerDispatcher : public ceph::net::Dispatcher {
-        int count = 0;
-        seastar::promise<> on_second; // satisfied on second dispatch
-        seastar::promise<> on_done; // satisfied when first dispatch unblocks
-       public:
-        seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
-                                      MessageRef m) override {
-          switch (++count) {
-          case 1:
-            // block on the first request until we reenter with the second
-            return on_second.get_future().then([=] { on_done.set_value(); });
-          case 2:
-            on_second.set_value();
-            return seastar::now();
-          default:
-            throw std::runtime_error("unexpected count");
-          }
+    struct Server final
+      : public ceph::net::Dispatcher,
+        public seastar::peering_sharded_service<Server> {
+      ceph::net::Messenger *msgr = nullptr;
+      int count = 0;
+      seastar::promise<> on_second; // satisfied on second dispatch
+      seastar::promise<> on_done; // satisfied when first dispatch unblocks
+
+      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+                                    MessageRef m) override {
+        switch (++count) {
+        case 1:
+          // block on the first request until we reenter with the second
+          return on_second.get_future()
+            .then([this] {
+              return container().invoke_on_all([](Server& server) {
+                  server.on_done.set_value();
+                });
+            });
+        case 2:
+          on_second.set_value();
+          return seastar::now();
+        default:
+          throw std::runtime_error("unexpected count");
         }
-        seastar::future<> wait() { return on_done.get_future(); }
-      } dispatcher;
-    } server;
-
-    struct {
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2", 4};
-      ceph::net::Dispatcher dispatcher;
-    } client;
+      }
+
+      seastar::future<> wait() { return on_done.get_future(); }
+
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce,
+                             const entity_addr_t& addr) {
+        return ceph::net::Messenger::create(name, lname, nonce)
+          .then([this, addr](ceph::net::Messenger *messenger) {
+            return container().invoke_on_all([messenger](auto& server) {
+                server.msgr = messenger->get_local_shard();
+              }).then([messenger, addr] {
+                return messenger->bind(entity_addrvec_t{addr});
+              }).then([this, messenger] {
+                return messenger->start(this);
+              });
+          });
+      }
+
+      Dispatcher* get_local_shard() override {
+        return &(container().local());
+      }
+      seastar::future<> stop() {
+        return seastar::make_ready_future<>();
+      }
+    };
+
+    struct Client final
+      : public ceph::net::Dispatcher,
+        public seastar::peering_sharded_service<Client> {
+      ceph::net::Messenger *msgr = nullptr;
+
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce) {
+        return ceph::net::Messenger::create(name, lname, nonce)
+          .then([this](ceph::net::Messenger *messenger) {
+            return container().invoke_on_all([messenger](auto& client) {
+                client.msgr = messenger->get_local_shard();
+              }).then([this, messenger] {
+                return messenger->start(this);
+              });
+          });
+      }
+
+      Dispatcher* get_local_shard() override {
+        return &(container().local());
+      }
+      seastar::future<> stop() {
+        return seastar::make_ready_future<>();
+      }
+    };
   };
-  return seastar::do_with(test_state{},
-    [] (test_state& t) {
-      // bind the server
-      t.addr.set_type(entity_addr_t::TYPE_LEGACY);
-      t.addr.set_family(AF_INET);
-      t.addr.set_port(9010);
-      t.addr.set_nonce(3);
-      t.server.messenger.bind(entity_addrvec_t{t.addr});
-
-      return t.server.messenger.start(&t.server.dispatcher)
-        .then([&] {
-          return t.client.messenger.start(&t.client.dispatcher)
-            .then([&] {
-              return t.client.messenger.connect(t.addr,
-                                                entity_name_t::TYPE_OSD);
-            }).then([] (ceph::net::ConnectionRef conn) {
-              // send two messages
-              conn->send(MessageRef{new MPing, false});
-              conn->send(MessageRef{new MPing, false});
-            }).then([&] {
-              // wait for the server to get both
-              return t.server.dispatcher.wait();
-            }).finally([&] {
-              return t.client.messenger.shutdown();
-            });
-        }).finally([&] {
-          return t.server.messenger.shutdown();
+
+  logger().info("test_concurrent_dispatch():");
+  return seastar::when_all_succeed(
+      ceph::net::create_sharded<test_state::Server>(),
+      ceph::net::create_sharded<test_state::Client>())
+    .then([](test_state::Server *server,
+             test_state::Client *client) {
+      entity_addr_t addr;
+      addr.parse("127.0.0.1:9010", nullptr);
+      addr.set_type(entity_addr_t::TYPE_LEGACY);
+      addr.set_family(AF_INET);
+      return seastar::when_all_succeed(
+          server->init(entity_name_t::OSD(4), "server3", 5, addr),
+          client->init(entity_name_t::OSD(5), "client3", 6))
+        .then([server, client] {
+          return client->msgr->connect(server->msgr->get_myaddr(),
+                                      entity_name_t::TYPE_OSD);
+        }).then([](ceph::net::ConnectionXRef conn) {
+          // send two messages
+          (*conn)->send(MessageRef{new MPing, false});
+          (*conn)->send(MessageRef{new MPing, false});
+        }).then([server] {
+          server->wait();
+        }).finally([client] {
+          logger().info("client shutdown...");
+          return client->msgr->shutdown();
+        }).finally([server] {
+          logger().info("server shutdown...");
+          return server->msgr->shutdown();
         });
     });
 }
 
+}
+
 int main(int argc, char** argv)
 {
   seastar::app_template app;
@@ -198,7 +380,7 @@ int main(int argc, char** argv)
      "number of pingpong rounds")
     ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
      "ratio of keepalive in ping messages");
-  return app.run(argc, argv, [&] {
+  return app.run(argc, argv, [&app] {
     auto&& config = app.configuration();
     verbose = config["verbose"].as<bool>();
     auto rounds = config["rounds"].as<unsigned>();