}
     };
 
-    struct Client final
-        : public crimson::net::Dispatcher {
+    class Client final
+        : public crimson::net::Dispatcher,
+          public seastar::peering_sharded_service<Client> {
+    public:
+      Client(seastar::shard_id primary_sid,
+             unsigned rounds,
+             double keepalive_ratio,
+             ShardedGates *gates)
+        : primary_sid{primary_sid},
+          keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
+          rounds(rounds),
+          gates{*gates} {}
+
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce) {
+        assert(seastar::this_shard_id() == primary_sid);
+        msgr = crimson::net::Messenger::create(
+            name, lname, nonce, false);
+        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->start({this});
+      }
+
+      seastar::future<> shutdown() {
+        assert(seastar::this_shard_id() == primary_sid);
+        ceph_assert(msgr);
+       msgr->stop();
+        return msgr->shutdown();
+      }
+
+      seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
+        assert(seastar::this_shard_id() == primary_sid);
+        mono_time start_time = mono_clock::now();
+        auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+        return seastar::futurize_invoke([this, conn] {
+          return do_dispatch_pingpong(conn);
+        }).then([] {
+          // 500ms should be enough to establish the connection
+          return seastar::sleep(500ms);
+        }).then([this, conn, start_time] {
+          return container().invoke_on(
+              conn->get_shard_id(),
+              [pconn=&*conn, start_time](auto &local) {
+            assert(pconn->is_connected());
+            auto session = local.find_session(pconn);
+            std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+            std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+            logger().info("{}: handshake {}, pingpong {}",
+                          *pconn, dur_handshake.count(), dur_pingpong.count());
+          }).then([conn] {});
+        });
+      }
+
+      static seastar::future<Client*> create(
+          unsigned rounds,
+          double keepalive_ratio,
+          ShardedGates *gates) {
+        return create_sharded<Client>(
+          seastar::this_shard_id(),
+          rounds,
+          keepalive_ratio,
+          gates);
+      }
+
+     private:
       struct PingSession : public seastar::enable_shared_from_this<PingSession> {
         unsigned count = 0u;
         mono_time connected_time;
       };
       using PingSessionRef = seastar::shared_ptr<PingSession>;
 
-      unsigned rounds;
-      std::bernoulli_distribution keepalive_dist;
-      crimson::net::MessengerRef msgr;
-      std::map<crimson::net::ConnectionRef, seastar::promise<>> pending_conns;
-      std::map<crimson::net::ConnectionRef, PingSessionRef> sessions;
-      crimson::auth::DummyAuthClientServer dummy_auth;
-
-      Client(unsigned rounds, double keepalive_ratio)
-        : rounds(rounds),
-          keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
-
-      PingSessionRef find_session(crimson::net::ConnectionRef c) {
-        auto found = sessions.find(c);
-        if (found == sessions.end()) {
-          ceph_assert(false);
-        }
-        return found->second;
-      }
-
       void ms_handle_connect(
           crimson::net::ConnectionRef conn,
           seastar::shard_id prv_shard) override {
+        auto &local = container().local();
         assert(prv_shard == seastar::this_shard_id());
         auto session = seastar::make_shared<PingSession>();
-        auto [i, added] = sessions.emplace(conn, session);
+        auto [i, added] = local.sessions.emplace(&*conn, session);
         std::ignore = i;
         ceph_assert(added);
         session->connected_time = mono_clock::now();
       }
+
       std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
-        auto session = find_session(c);
+        auto &local = container().local();
+        auto session = local.find_session(&*c);
         ++(session->count);
         if (verbose) {
           logger().info("client ms_dispatch {}", session->count);
         if (session->count == rounds) {
           logger().info("{}: finished receiving {} pongs", *c, session->count);
           session->finish_time = mono_clock::now();
-          auto found = pending_conns.find(c);
-          ceph_assert(found != pending_conns.end());
-          found->second.set_value();
+          gates.dispatch_in_background("echo_notify_done", [c, this] {
+            return container().invoke_on(primary_sid, [pconn=&*c](auto &local) {
+              auto found = local.pending_conns.find(pconn);
+              ceph_assert(found != local.pending_conns.end());
+              found->second.set_value();
+            }).then([c] {});
+          });
         }
         return {seastar::now()};
       }
 
-      seastar::future<> init(const entity_name_t& name,
-                             const std::string& lname,
-                             const uint64_t nonce) {
-        msgr = crimson::net::Messenger::create(
-            name, lname, nonce, true);
-        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-        msgr->set_auth_client(&dummy_auth);
-        msgr->set_auth_server(&dummy_auth);
-        return msgr->start({this});
-      }
-
-      seastar::future<> shutdown() {
-        ceph_assert(msgr);
-       msgr->stop();
-        return msgr->shutdown();
-      }
-
-      seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
-        mono_time start_time = mono_clock::now();
-        auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
-        return seastar::futurize_invoke([this, conn] {
-          return do_dispatch_pingpong(conn);
-        }).then([this, conn, start_time] {
-          auto session = find_session(conn);
-          std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
-          std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
-          logger().info("{}: handshake {}, pingpong {}",
-                        *conn, dur_handshake.count(), dur_pingpong.count());
-        });
+      PingSessionRef find_session(crimson::net::Connection *c) {
+        auto found = sessions.find(c);
+        if (found == sessions.end()) {
+          ceph_assert(false);
+        }
+        return found->second;
       }
 
-     private:
       seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) {
-        auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
+        auto [i, added] = pending_conns.emplace(&*conn, seastar::promise<>());
         std::ignore = i;
         ceph_assert(added);
         return seastar::do_with(0u, 0u,
             },
             [this, conn, &count_ping, &count_keepalive] {
               return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
-                  if (keepalive_dist(rng)) {
-                    return conn->send_keepalive()
-                      .then([&count_keepalive] {
-                        count_keepalive += 1;
-                        return seastar::make_ready_future<seastar::stop_iteration>(
-                          seastar::stop_iteration::no);
-                      });
-                  } else {
-                    return conn->send(crimson::make_message<MPing>())
-                      .then([&count_ping] {
-                        count_ping += 1;
-                        return seastar::make_ready_future<seastar::stop_iteration>(
-                          seastar::stop_iteration::yes);
-                      });
-                  }
-                });
+                if (keepalive_dist(rng)) {
+                  return conn->send_keepalive(
+                  ).then([&count_keepalive] {
+                    count_keepalive += 1;
+                    return seastar::make_ready_future<seastar::stop_iteration>(
+                      seastar::stop_iteration::no);
+                  });
+                } else {
+                  return conn->send(crimson::make_message<MPing>()
+                  ).then([&count_ping] {
+                    count_ping += 1;
+                    return seastar::make_ready_future<seastar::stop_iteration>(
+                      seastar::stop_iteration::yes);
+                  });
+                }
+              });
             }).then([this, conn] {
-              auto found = pending_conns.find(conn);
+              auto found = pending_conns.find(&*conn);
+              assert(found != pending_conns.end());
               return found->second.get_future();
             }
           );
         });
       }
+
+    private:
+      // primary shard only
+      const seastar::shard_id primary_sid;
+      std::bernoulli_distribution keepalive_dist;
+      crimson::net::MessengerRef msgr;
+      std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
+      crimson::auth::DummyAuthClientServer dummy_auth;
+
+      // per shard
+      const unsigned rounds;
+      std::map<crimson::net::Connection*, PingSessionRef> sessions;
+      ShardedGates &gates;
     };
   };
 
                 rounds, keepalive_ratio);
   return ShardedGates::create(
   ).then([rounds, keepalive_ratio](auto *gates) {
+    return seastar::when_all_succeed(
+      test_state::Client::create(rounds, keepalive_ratio, gates),
+      test_state::Client::create(rounds, keepalive_ratio, gates),
+      seastar::make_ready_future<ShardedGates*>(gates));
+  }).then_unpack([](auto *client1, auto *client2, auto *gates) {
     auto server1 = seastar::make_shared<test_state::Server>(*gates);
     auto server2 = seastar::make_shared<test_state::Server>(*gates);
-    auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
-    auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
     // start servers and clients
     auto addr1 = get_server_addr();
     auto addr2 = get_server_addr();
     // dispatch pingpoing
     ).then_unpack([client1, client2, server1, server2] {
       return seastar::when_all_succeed(
-          // test connecting in parallel, accepting in parallel
-          client1->dispatch_pingpong(server2->msgr->get_myaddr()),
-          client2->dispatch_pingpong(server1->msgr->get_myaddr()));
+        // test connecting in parallel, accepting in parallel
+        client1->dispatch_pingpong(server1->msgr->get_myaddr()),
+        client1->dispatch_pingpong(server2->msgr->get_myaddr()),
+        client2->dispatch_pingpong(server1->msgr->get_myaddr()),
+        client2->dispatch_pingpong(server2->msgr->get_myaddr()));
     // shutdown
-    }).then_unpack([] {
-      return seastar::now();
-    }).then([client1] {
+    }).then_unpack([client1] {
       logger().info("client1 shutdown...");
       return client1->shutdown();
     }).then([client2] {
       return server2->shutdown();
     }).then([] {
       logger().info("test_echo() done!\n");
-    }).handle_exception([server1, server2, client1, client2] (auto eptr) {
+    }).handle_exception([](auto eptr) {
       logger().error("test_echo() failed: got exception {}", eptr);
       throw;
-    }).finally([gates] {
+    }).finally([gates, server1, server2] {
       return gates->close();
     });
   });