]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net/Connection: let send and keepalive be called from foreign cores
authorSamuel Just <sjust@redhat.com>
Thu, 8 Sep 2022 18:40:43 +0000 (18:40 +0000)
committerSamuel Just <sjust@redhat.com>
Tue, 27 Sep 2022 02:35:41 +0000 (19:35 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/net/Connection.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 95d7d807fa0f800730310b9ea724314a455968a8..b2d2b236f2f60146b54eee9cf622bfd8367f4c1a 100644 (file)
@@ -28,6 +28,14 @@ class Interceptor;
 
 using seq_num_t = uint64_t;
 
+/**
+ * Connection
+ *
+ * Abstraction for messenger connections.
+ *
+ * Except when otherwise specified, methods must be invoked from the core on which
+ * the connection originates.
+ */
 class Connection : public seastar::enable_shared_from_this<Connection> {
   entity_name_t peer_name = {0, entity_name_t::NEW};
 
@@ -125,11 +133,22 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
   virtual bool peer_wins() const = 0;
 #endif
 
-  /// send a message over a connection that has completed its handshake
+  /**
+   * send
+   *
+   * Send a message over a connection that has completed its handshake.
+   * May be invoked from any core.
+   */
   virtual seastar::future<> send(MessageURef msg) = 0;
 
-  /// send a keepalive message over a connection that has completed its
-  /// handshake
+  /**
+   * keepalive
+   *
+   * Send a keepalive message over a connection that has completed its
+   * handshake.
+   *
+   * May be invoked from any core.
+   */
   virtual seastar::future<> keepalive() = 0;
 
   // close the connection and cancel any any pending futures from read/send,
index 2fb2f1f1ec8fd81854eb6956dc998de8a70207b3..65ab399156bc83ff60d0032fb48b6925f1001414 100644 (file)
@@ -27,7 +27,8 @@ using crimson::common::local_conf;
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    ChainedDispatchers& dispatchers)
-  : messenger(messenger),
+  : core(messenger.shard_id()),
+    messenger(messenger),
     protocol(std::make_unique<ProtocolV2>(dispatchers, *this, messenger))
 {
 #ifdef UNIT_TESTS_BUILT
@@ -72,14 +73,20 @@ bool SocketConnection::peer_wins() const
 
 seastar::future<> SocketConnection::send(MessageURef msg)
 {
-  assert(seastar::this_shard_id() == shard_id());
-  return protocol->send(std::move(msg));
+  return seastar::smp::submit_to(
+    shard_id(),
+    [this, msg=std::move(msg)]() mutable {
+      return protocol->send(std::move(msg));
+    });
 }
 
 seastar::future<> SocketConnection::keepalive()
 {
-  assert(seastar::this_shard_id() == shard_id());
-  return protocol->keepalive();
+  return seastar::smp::submit_to(
+    shard_id(),
+    [this] {
+      return protocol->keepalive();
+    });
 }
 
 void SocketConnection::mark_down()
@@ -128,7 +135,7 @@ SocketConnection::close_clean(bool dispatch_reset)
 }
 
 seastar::shard_id SocketConnection::shard_id() const {
-  return messenger.shard_id();
+  return core;
 }
 
 seastar::socket_address SocketConnection::get_local_address() const {
index 4aad9d65747c638ccff83fcf6f9e643f184e3ed6..b26f77dd714106274f629853be6f955df642f283 100644 (file)
@@ -29,6 +29,7 @@ class SocketConnection;
 using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
 
 class SocketConnection : public Connection {
+  const seastar::shard_id core;
   SocketMessenger& messenger;
   std::unique_ptr<Protocol> protocol;