]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: make ms_dispatch() return optional<future<>> 38360/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 2 Dec 2020 03:25:10 +0000 (11:25 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 2 Dec 2020 03:25:10 +0000 (11:25 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
12 files changed:
src/crimson/mgr/client.cc
src/crimson/mgr/client.h
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h
src/crimson/net/Dispatcher.h
src/crimson/net/chained_dispatchers.cc
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/test/crimson/test_messenger.cc
src/tools/crimson/perf_crimson_msgr.cc

index c00370cf3b31769ccafb4aedc30afaca329f59dd..5aa8a88ba214a7eb59ba7734411264321b662ed6 100644 (file)
@@ -47,7 +47,7 @@ seastar::future<> Client::stop()
   return fut;
 }
 
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
 Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   bool dispatched = true;
@@ -62,7 +62,7 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
       return seastar::now();
     }
   });
-  return {dispatched, seastar::now()};
+  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
 void Client::ms_handle_connect(crimson::net::ConnectionRef c)
index 555e779bfbc057ebfe0402cc28148551b77bea18..ad7e1fde54e8e0a52c43ebea26277dddeb374c9e 100644 (file)
@@ -37,7 +37,7 @@ public:
   void report();
 
 private:
-  std::tuple<bool, seastar::future<>> ms_dispatch(
+  std::optional<seastar::future<>> ms_dispatch(
       crimson::net::ConnectionRef conn, Ref<Message> m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   void ms_handle_connect(crimson::net::ConnectionRef conn) final;
index 5df743aaeb0a28a48cd1f54b3963ec85911fe66b..9dfbb103a38b4ea4f14302ba5cb90afab34d921e 100644 (file)
@@ -518,7 +518,7 @@ bool Client::is_hunting() const {
   return !active_con;
 }
 
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
 Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   bool dispatched = true;
@@ -550,7 +550,7 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
       return seastar::now();
     }
   });
-  return {dispatched, seastar::now()};
+  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
 void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)
index bc8593d60a1e1a5aefc35b63c7eab4e0176a4cd2..e7d2df86393036e2bbfeed5eca32c3a9c6de6c76 100644 (file)
@@ -140,8 +140,8 @@ private:
 private:
   void tick();
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
-                                                  MessageRef m) override;
+  std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
+                                               MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
 
   seastar::future<> handle_monmap(crimson::net::ConnectionRef conn,
index 71be61783b097a5320a6f3e0a9e3894010b81c64..cc6fd4574c7518da11c8471352ba1b0f35d0671d 100644 (file)
@@ -25,11 +25,10 @@ class Dispatcher {
   virtual ~Dispatcher() {}
 
   // Dispatchers are put into a chain as described by chain-of-responsibility
-  // pattern. If any of the dispatchers claims this message, it returns true
-  // to prevent other dispatchers from processing it, and returns a future
-  // to throttle the connection if it's too busy. Else, it returns false and
-  // the second future is ignored.
-  virtual std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
+  // pattern. If any of the dispatchers claims this message, it returns a valid
+  // future to prevent other dispatchers from processing it, and this is also
+  // used to throttle the connection if it's too busy.
+  virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
 
   virtual void ms_handle_accept(ConnectionRef conn) {}
 
index 635794e0db3b9ac3ec842065ba21b95555a4f5e3..b13d40c8f7318d720ed0445ac37ab4e1ee8f7657 100644 (file)
@@ -17,16 +17,15 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
                                 MessageRef m) {
   try {
     for (auto& dispatcher : dispatchers) {
-      auto [dispatched, throttle_future] = dispatcher->ms_dispatch(conn, m);
-      if (dispatched) {
-        return std::move(throttle_future
+      auto dispatched = dispatcher->ms_dispatch(conn, m);
+      if (dispatched.has_value()) {
+        return std::move(*dispatched
         ).handle_exception([conn] (std::exception_ptr eptr) {
           logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
                          *conn, eptr);
           ceph_abort();
         });
       }
-      assert(throttle_future.available());
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_dispatch() {}",
index 3f4fff3aac5c93504a47e43b3e93b18e3cd80458..81ec06ecd5de5421d9237a78fd01657912650b17 100644 (file)
@@ -203,7 +203,7 @@ void Heartbeat::remove_peer(osd_id_t peer)
   peers.erase(peer);
 }
 
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
 Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   bool dispatched = true;
@@ -216,7 +216,7 @@ Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
       return seastar::now();
     }
   });
-  return {dispatched, seastar::now()};
+  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
 void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
index 46d12463c3db87cbceb18e2f8d25a484b2994490..4947e871ff5adbd582279250abb1cf7beb536500 100644 (file)
@@ -48,7 +48,7 @@ public:
   void set_require_authorizer(bool);
 
   // Dispatcher methods
-  std::tuple<bool, seastar::future<>> ms_dispatch(
+  std::optional<seastar::future<>> ms_dispatch(
       crimson::net::ConnectionRef conn, MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
   void ms_handle_connect(crimson::net::ConnectionRef conn) override;
index 43ccd812c0cbda7bad21bcb7714f23ee6ba1a788..cdd05a64cfe663aa5710f6b3bddca05aeab409ba 100644 (file)
@@ -614,11 +614,11 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
   });
 }
 
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
 OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   if (state.is_stopping()) {
-    return {false, seastar::now()};
+    return {};
   }
   bool dispatched = true;
   gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
@@ -678,7 +678,7 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
       return seastar::now();
     }
   });
-  return {dispatched, seastar::now()};
+  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
 void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
index 5b6fcc8448c89ae2d49650b3b5f8e85920033ddc..889960ced8d9681aa533b0d189bf097eb18c5a46 100644 (file)
@@ -96,7 +96,7 @@ class OSD final : public crimson::net::Dispatcher,
   OSDSuperblock superblock;
 
   // Dispatcher methods
-  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final;
+  std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
 
index aed86c3ab14bfb9ebcdf5450691bb961a4231a1a..9e7adbe3bff8a714f8cfa27773799a06e15890e2 100644 (file)
@@ -57,14 +57,14 @@ static seastar::future<> test_echo(unsigned rounds,
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
         if (verbose) {
           logger().info("server got {}", *m);
         }
         // reply with a pong
         std::ignore = c->send(make_message<MPing>());
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
       seastar::future<> init(const entity_name_t& name,
@@ -127,7 +127,7 @@ static seastar::future<> test_echo(unsigned rounds,
         ceph_assert(added);
         session->connected_time = mono_clock::now();
       }
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
         auto session = find_session(c);
         ++(session->count);
@@ -142,7 +142,7 @@ static seastar::future<> test_echo(unsigned rounds,
           ceph_assert(found != pending_conns.end());
           found->second.set_value();
         }
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
       seastar::future<> init(const entity_name_t& name,
@@ -277,7 +277,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
       seastar::promise<> on_done; // satisfied when first dispatch unblocks
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef, MessageRef m) override {
         switch (++count) {
         case 1:
@@ -290,7 +290,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
         default:
           throw std::runtime_error("unexpected count");
         }
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
       seastar::future<> wait() { return on_done.get_future(); }
@@ -319,9 +319,9 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef, MessageRef m) override {
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
       seastar::future<> init(const entity_name_t& name,
@@ -378,10 +378,10 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
         std::ignore = c->send(make_message<MPing>());
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
      public:
@@ -419,9 +419,9 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       bool stop_send = false;
       seastar::promise<> stopped_send_promise;
 
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef, MessageRef m) override {
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
      public:
@@ -813,7 +813,7 @@ class FailoverSuite : public Dispatcher {
   unsigned pending_peer_receive = 0;
   unsigned pending_receive = 0;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
     auto result = interceptor.find_result(c);
     if (result == nullptr) {
       logger().error("Untracked ms dispatched connection: {}", *c);
@@ -835,7 +835,7 @@ class FailoverSuite : public Dispatcher {
     }
     logger().info("[Test] got op, left {} ops -- [{}] {}",
                   pending_receive, result->index, *c);
-    return {true, seastar::now()};
+    return {seastar::now()};
   }
 
   void ms_handle_accept(ConnectionRef conn) override {
@@ -1209,7 +1209,7 @@ class FailoverTest : public Dispatcher {
 
   std::unique_ptr<FailoverSuite> test_suite;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
     switch (m->get_type()) {
      case CEPH_MSG_PING:
       ceph_assert(recv_pong);
@@ -1232,7 +1232,7 @@ class FailoverTest : public Dispatcher {
       logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
       ceph_abort();
     }
-    return {true, seastar::now()};
+    return {seastar::now()};
   }
 
  private:
@@ -1407,12 +1407,12 @@ class FailoverSuitePeer : public Dispatcher {
   ConnectionRef tracked_conn;
   unsigned pending_send = 0;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
     logger().info("[TestPeer] got op from Test");
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
     ceph_assert(tracked_conn == c);
     std::ignore = op_callback();
-    return {true, seastar::now()};
+    return {seastar::now()};
   }
 
   void ms_handle_accept(ConnectionRef conn) override {
@@ -1537,7 +1537,7 @@ class FailoverTestPeer : public Dispatcher {
   const entity_addr_t test_peer_addr;
   std::unique_ptr<FailoverSuitePeer> test_suite;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
     ceph_assert(cmd_conn == c);
     switch (m->get_type()) {
      case CEPH_MSG_PING:
@@ -1562,7 +1562,7 @@ class FailoverTestPeer : public Dispatcher {
       logger().error("{} got unexpected msg from cmd client: {}", *c, m);
       ceph_abort();
     }
-    return {true, seastar::now()};
+    return {seastar::now()};
   }
 
   void ms_handle_accept(ConnectionRef conn) override {
index d53c619e9f7ec211f23c3ad7c4ceba0ecb914082..e76f273a921f23a299092e76e67bc53240130448 100644 (file)
@@ -152,7 +152,7 @@ static seastar::future<> run(
         msg_data.append_zero(msg_len);
       }
 
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
@@ -167,7 +167,7 @@ static seastar::future<> run(
         rep->write(0, msg_len, data);
         rep->set_tid(m->get_tid());
         std::ignore = c->send(std::move(rep));
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
       seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
@@ -307,7 +307,7 @@ static seastar::future<> run(
       void ms_handle_connect(crimson::net::ConnectionRef conn) override {
         conn_stats.connected_time = mono_clock::now();
       }
-      std::tuple<bool, seastar::future<>> ms_dispatch(
+      std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef, MessageRef m) override {
         // server replies with MOSDOp to generate server-side write workload
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
@@ -327,7 +327,7 @@ static seastar::future<> run(
         ++(conn_stats.received_count);
         depth.signal(1);
 
-        return {true, seastar::now()};
+        return {seastar::now()};
       }
 
       // should start messenger at this shard?