]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: return tuple<bool, future<>> from ms_dispatch()
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 27 Nov 2020 04:53:24 +0000 (12:53 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 27 Nov 2020 05:13:40 +0000 (13:13 +0800)
Return a boolean to notify whether the Dispatcher has claimed the
message. And fix all the ms_dispatch() to return future only for
throttling purposes, which is currently always seastar::now().

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
13 files changed:
src/crimson/mgr/client.cc
src/crimson/mgr/client.h
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h
src/crimson/net/Dispatcher.h
src/crimson/net/chained_dispatchers.cc
src/crimson/net/chained_dispatchers.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/test/crimson/test_messenger.cc
src/tools/crimson/perf_crimson_msgr.cc

index fcc8f8326070ade4a7638e9713ea709b2f82b1d5..a98480ac8b519e7a881b80a9137e447f7739e1a3 100644 (file)
@@ -47,19 +47,22 @@ seastar::future<> Client::stop()
   return fut;
 }
 
-seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn,
-                                      MessageRef m)
+std::tuple<bool, seastar::future<>>
+Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
 {
-  return gate.dispatch(__func__, *this, [this, conn, &m] {
+  bool dispatched = true;
+  gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
     switch(m->get_type()) {
     case MSG_MGR_MAP:
       return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
     case MSG_MGR_CONFIGURE:
       return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
     default:
+      dispatched = false;
       return seastar::now();
     }
   });
+  return {dispatched, seastar::now()};
 }
 
 void Client::ms_handle_connect(crimson::net::ConnectionRef c)
index 19e4cd6ee25d43ef13a3f57133b971ce259919fa..78640a3509aba0518cfcbfb0d2c3667b1d609c10 100644 (file)
@@ -37,8 +37,8 @@ public:
   void report();
 
 private:
-  seastar::future<> ms_dispatch(crimson::net::Connection* conn,
-                               Ref<Message> m) override;
+  std::tuple<bool, seastar::future<>> ms_dispatch(
+      crimson::net::Connection* conn, Ref<Message> m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   void ms_handle_connect(crimson::net::ConnectionRef conn) final;
   seastar::future<> handle_mgr_map(crimson::net::Connection* conn,
index 9be91ce84972ee413db260e5f83a5ef5eb547eba..835bce01cae7c0bebc12916f13b295e99aff3b92 100644 (file)
@@ -518,10 +518,11 @@ bool Client::is_hunting() const {
   return !active_con;
 }
 
-seastar::future<>
+std::tuple<bool, seastar::future<>>
 Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
 {
-  return gate.dispatch(__func__, *this, [this, conn, &m] {
+  bool dispatched = true;
+  gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
     // we only care about these message types
     switch (m->get_type()) {
     case CEPH_MSG_MON_MAP:
@@ -545,9 +546,11 @@ Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
       return handle_config(
        boost::static_pointer_cast<MConfig>(m));
     default:
+      dispatched = false;
       return seastar::now();
     }
   });
+  return {dispatched, seastar::now()};
 }
 
 void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)
index 0f651dd5d1769c68401c5d15e3a03d9159187732..bf5daa850ca9cab44a86ae4c8ffaa98a5bd3e6e4 100644 (file)
@@ -140,8 +140,8 @@ private:
 private:
   void tick();
 
-  seastar::future<> ms_dispatch(crimson::net::Connection* conn,
-                               MessageRef m) override;
+  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::Connection* conn,
+                                                  MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
 
   seastar::future<> handle_monmap(crimson::net::Connection* conn,
index fd26d146166a7a42eddfe398d55ed72bbe28b9bb..7c39277f6b14e38a34b53f2b012a035d91963d39 100644 (file)
@@ -31,9 +31,15 @@ class Dispatcher : public boost::intrusive::slist_base_hook<
  public:
   virtual ~Dispatcher() {}
 
-  virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) {
-    return seastar::make_ready_future<>();
+  // Dispatchers are put into a chain as described by chain-of-responsibility
+  // pattern. If any of the dispatchers claims this message, it returns true
+  // to prevent other dispatchers from processing it, and returns a future
+  // to throttle the connection if it's too busy. Else, it returns false and
+  // the second future is ignored.
+  virtual std::tuple<bool, seastar::future<>> ms_dispatch(Connection* conn, MessageRef m) {
+    return {false, seastar::now<>()};
   }
+
   virtual void ms_handle_accept(ConnectionRef conn) {}
 
   virtual void ms_handle_connect(ConnectionRef conn) {}
index 1977e1da4d74775bc1197cbf0795db8203b87164..717a2db748eceee5ddbb8e38b36dad1b6403e95d 100644 (file)
@@ -13,19 +13,27 @@ seastar::future<>
 ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
                                 MessageRef m) {
   try {
-    return seastar::do_for_each(dispatchers, [conn, m](Dispatcher& dispatcher) {
-      return dispatcher.ms_dispatch(conn, m);
-    }).handle_exception([conn] (std::exception_ptr eptr) {
-      logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
-                     *conn, eptr);
-      ceph_abort();
-    });
+    for (auto& dispatcher : dispatchers) {
+      auto [dispatched, throttle_future] = dispatcher.ms_dispatch(conn, m);
+      if (dispatched) {
+        return std::move(throttle_future
+        ).handle_exception([conn] (std::exception_ptr eptr) {
+          logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
+                         *conn, eptr);
+          ceph_abort();
+        });
+      }
+      assert(throttle_future.available());
+    }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_dispatch() {}",
                    *conn, std::current_exception());
     ceph_abort();
-    return seastar::now();
   }
+  if (!dispatchers.empty()) {
+    logger().error("ms_dispatch unhandled message {}", *m);
+  }
+  return seastar::now();
 }
 
 void
index a5facef2b96de0d4b971676c66147be71f00eb9d..139a825e9445ebec4c392839788dd4e6b5b56326 100644 (file)
 
 using crimson::net::Dispatcher;
 
-// in existing Messenger, dispatchers are put into a chain as described by
-// chain-of-responsibility pattern. we could do the same to stop processing
-// the message once any of the dispatchers claims this message, and prevent
-// other dispatchers from reading it. but this change is more involved as
-// it requires changing the ms_ methods to return a bool. so as an intermediate 
-// solution, we are using an observer dispatcher to notify all the interested
-// or unintersted parties.
 class ChainedDispatchers {
   boost::intrusive::slist<
     Dispatcher,
index d8dc1e550e168cfce2f938d359d9488c88325225..0f8b20768e46d7d0084de7b8acfe54eff943360d 100644 (file)
@@ -206,17 +206,20 @@ void Heartbeat::remove_peer(osd_id_t peer)
   peers.erase(peer);
 }
 
-seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
-                                         MessageRef m)
+std::tuple<bool, seastar::future<>>
+Heartbeat::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
 {
-  return gate.dispatch(__func__, *this, [this, conn, &m] {
+  bool dispatched = true;
+  gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
     switch (m->get_type()) {
     case MSG_OSD_PING:
       return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
     default:
+      dispatched = false;
       return seastar::now();
     }
   });
+  return {dispatched, seastar::now()};
 }
 
 void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
index b8d13ee356712bade9d27914c5a7d90ef063ba4b..9d85b526ca25380b9cd9508b5f95b7a1c7ef1cb8 100644 (file)
@@ -48,8 +48,8 @@ public:
   void set_require_authorizer(bool);
 
   // Dispatcher methods
-  seastar::future<> ms_dispatch(crimson::net::Connection* conn,
-                               MessageRef m) override;
+  std::tuple<bool, seastar::future<>> ms_dispatch(
+      crimson::net::Connection* conn, MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
   void ms_handle_connect(crimson::net::ConnectionRef conn) override;
   void ms_handle_accept(crimson::net::ConnectionRef conn) override;
index 430d8ed2ebb0e28a132e5b4d34f53aa5735d772b..c169ece16000b18e1838fc1cb481478fd864a6fa 100644 (file)
@@ -617,12 +617,14 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
   });
 }
 
-seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+std::tuple<bool, seastar::future<>>
+OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
 {
-  return gate.dispatch(__func__, *this, [this, conn, &m] {
-    if (state.is_stopping()) {
-      return seastar::now();
-    }
+  if (state.is_stopping()) {
+    return {false, seastar::now()};
+  }
+  bool dispatched = true;
+  gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
     switch (m->get_type()) {
     case CEPH_MSG_OSD_MAP:
       return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
@@ -675,10 +677,11 @@ seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
     case MSG_OSD_SCRUB2:
       return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
     default:
-      logger().info("ms_dispatch unhandled message {}", *m);
+      dispatched = false;
       return seastar::now();
     }
   });
+  return {dispatched, seastar::now()};
 }
 
 void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
index 72a892fb2ffbca7182e7907b8c9e5f92cce73f6b..eac8be594279e18e2106f86046b2e6a11df13f1a 100644 (file)
@@ -97,7 +97,7 @@ class OSD final : public crimson::net::Dispatcher,
   OSDSuperblock superblock;
 
   // Dispatcher methods
-  seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final;
+  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::Connection*, MessageRef) final;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
 
index 429509c435235a50f9052c89aaf7456c94277525..74c6aeadb3845378e6dadee9cc12da1c25febbb8 100644 (file)
@@ -48,13 +48,14 @@ static seastar::future<> test_echo(unsigned rounds,
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      seastar::future<> ms_dispatch(crimson::net::Connection* c,
-                                    MessageRef m) override {
+      std::tuple<bool, seastar::future<>> ms_dispatch(
+          crimson::net::Connection* c, MessageRef m) override {
         if (verbose) {
           logger().info("server got {}", *m);
         }
         // reply with a pong
-        return c->send(make_message<MPing>());
+        std::ignore = c->send(make_message<MPing>());
+        return {true, seastar::now()};
       }
 
       seastar::future<> init(const entity_name_t& name,
@@ -114,8 +115,8 @@ static seastar::future<> test_echo(unsigned rounds,
         ceph_assert(added);
         session->connected_time = mono_clock::now();
       }
-      seastar::future<> ms_dispatch(crimson::net::Connection* c,
-                                    MessageRef m) override {
+      std::tuple<bool, seastar::future<>> ms_dispatch(
+          crimson::net::Connection* c, MessageRef m) override {
         auto session = find_session(c);
         ++(session->count);
         if (verbose) {
@@ -129,7 +130,7 @@ static seastar::future<> test_echo(unsigned rounds,
           ceph_assert(found != pending_conns.end());
           found->second.set_value();
         }
-        return seastar::now();
+        return {true, seastar::now()};
       }
 
       seastar::future<> init(const entity_name_t& name,
@@ -268,20 +269,20 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
       seastar::promise<> on_done; // satisfied when first dispatch unblocks
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      seastar::future<> ms_dispatch(crimson::net::Connection* c,
-                                    MessageRef m) override {
+      std::tuple<bool, seastar::future<>> ms_dispatch(
+          crimson::net::Connection* c, MessageRef m) override {
         switch (++count) {
         case 1:
           // block on the first request until we reenter with the second
-          return on_second.get_future().then([this] {
-            on_done.set_value();
-          });
+          std::ignore = on_second.get_future().then([this] { on_done.set_value(); });
+          break;
         case 2:
           on_second.set_value();
-          return seastar::now();
+          break;
         default:
           throw std::runtime_error("unexpected count");
         }
+        return {true, seastar::now()};
       }
 
       seastar::future<> wait() { return on_done.get_future(); }
@@ -364,9 +365,10 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      seastar::future<> ms_dispatch(crimson::net::Connection* c,
-                                    MessageRef m) override {
-        return c->send(make_message<MPing>());
+      std::tuple<bool, seastar::future<>> ms_dispatch(
+          crimson::net::Connection* c, MessageRef m) override {
+        std::ignore = c->send(make_message<MPing>());
+        return {true, seastar::now()};
       }
 
      public:
@@ -401,9 +403,9 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       bool stop_send = false;
       seastar::promise<> stopped_send_promise;
 
-      seastar::future<> ms_dispatch(crimson::net::Connection* c,
-                                    MessageRef m) override {
-        return seastar::now();
+      std::tuple<bool, seastar::future<>> ms_dispatch(
+          crimson::net::Connection* c, MessageRef m) override {
+        return {true, seastar::now()};
       }
 
      public:
@@ -798,7 +800,7 @@ class FailoverSuite : public Dispatcher {
   unsigned pending_peer_receive = 0;
   unsigned pending_receive = 0;
 
-  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
     auto result = interceptor.find_result(c->shared_from_this());
     if (result == nullptr) {
       logger().error("Untracked ms dispatched connection: {}", *c);
@@ -820,7 +822,7 @@ class FailoverSuite : public Dispatcher {
     }
     logger().info("[Test] got op, left {} ops -- [{}] {}",
                   pending_receive, result->index, *c);
-    return seastar::now();
+    return {true, seastar::now()};
   }
 
   void ms_handle_accept(ConnectionRef conn) override {
@@ -1192,29 +1194,30 @@ class FailoverTest : public Dispatcher {
 
   std::unique_ptr<FailoverSuite> test_suite;
 
-  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
     switch (m->get_type()) {
      case CEPH_MSG_PING:
       ceph_assert(recv_pong);
       recv_pong->set_value();
       recv_pong = std::nullopt;
-      return seastar::now();
+      break;
      case MSG_COMMAND_REPLY:
       ceph_assert(recv_cmdreply);
       recv_cmdreply->set_value();
       recv_cmdreply = std::nullopt;
-      return seastar::now();
+      break;
      case MSG_COMMAND: {
       auto m_cmd = boost::static_pointer_cast<MCommand>(m);
       ceph_assert(static_cast<cmd_t>(m_cmd->cmd[0][0]) == cmd_t::suite_recv_op);
       ceph_assert(test_suite);
       test_suite->notify_peer_reply();
-      return seastar::now();
+      break;
      }
      default:
       logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
       ceph_abort();
     }
+    return {true, seastar::now()};
   }
 
  private:
@@ -1391,11 +1394,12 @@ class FailoverSuitePeer : public Dispatcher {
   ConnectionRef tracked_conn;
   unsigned pending_send = 0;
 
-  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
     logger().info("[TestPeer] got op from Test");
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
     ceph_assert(tracked_conn == c->shared_from_this());
-    return op_callback();
+    std::ignore = op_callback();
+    return {true, seastar::now()};
   }
 
   void ms_handle_accept(ConnectionRef conn) override {
@@ -1518,29 +1522,32 @@ class FailoverTestPeer : public Dispatcher {
   const entity_addr_t test_peer_addr;
   std::unique_ptr<FailoverSuitePeer> test_suite;
 
-  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
     ceph_assert(cmd_conn == c->shared_from_this());
     switch (m->get_type()) {
      case CEPH_MSG_PING:
-      return c->send(make_message<MPing>());
+      std::ignore = c->send(make_message<MPing>());
+      break;
      case MSG_COMMAND: {
       auto m_cmd = boost::static_pointer_cast<MCommand>(m);
       auto cmd = static_cast<cmd_t>(m_cmd->cmd[0][0]);
       if (cmd == cmd_t::shutdown) {
         logger().info("CmdSrv shutdown...");
         // forwarded to FailoverTestPeer::wait()
-       cmd_msgr->remove_dispatcher(*this);
-        (void) cmd_msgr->shutdown();
-        return seastar::now();
+        cmd_msgr->stop();
+        std::ignore = cmd_msgr->shutdown();
+      } else {
+        std::ignore = handle_cmd(cmd, m_cmd).then([c] {
+          return c->send(make_message<MCommandReply>());
+        });
       }
-      return handle_cmd(cmd, m_cmd).then([c] {
-        return c->send(make_message<MCommandReply>());
-      });
+      break;
      }
      default:
       logger().error("{} got unexpected msg from cmd client: {}", *c, m);
       ceph_abort();
     }
+    return {true, seastar::now()};
   }
 
   void ms_handle_accept(ConnectionRef conn) override {
index 1efef3a2f574c5ce83bf15621adda54e5b27101b..9e939a125fd7875dc82a8b58491ab231e65ccc1a 100644 (file)
@@ -152,8 +152,8 @@ static seastar::future<> run(
         msg_data.append_zero(msg_len);
       }
 
-      seastar::future<> ms_dispatch(crimson::net::Connection* c,
-                                    MessageRef m) override {
+      std::tuple<bool, seastar::future<>> ms_dispatch(
+          crimson::net::Connection* c, MessageRef m) override {
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
         // server replies with MOSDOp to generate server-side write workload
@@ -166,7 +166,8 @@ static seastar::future<> run(
         bufferlist data(msg_data);
         rep->write(0, msg_len, data);
         rep->set_tid(m->get_tid());
-        return c->send(std::move(rep));
+        std::ignore = c->send(std::move(rep));
+        return {true, seastar::now()};
       }
 
       seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
@@ -302,8 +303,8 @@ static seastar::future<> run(
       void ms_handle_connect(crimson::net::ConnectionRef conn) override {
         conn_stats.connected_time = mono_clock::now();
       }
-      seastar::future<> ms_dispatch(crimson::net::Connection* c,
-                                    MessageRef m) override {
+      std::tuple<bool, seastar::future<>> ms_dispatch(
+          crimson::net::Connection* c, MessageRef m) override {
         // server replies with MOSDOp to generate server-side write workload
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
@@ -322,7 +323,7 @@ static seastar::future<> run(
         ++(conn_stats.received_count);
         depth.signal(1);
 
-        return seastar::now();
+        return {true, seastar::now()};
       }
 
       // should start messenger at this shard?