]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: factor out IOHandler::shard_states_t which will be switchable atomically
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 2 Jun 2023 07:45:07 +0000 (15:45 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:32 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 37f1456027abf38516ddf11f9c3d8f210ca0d91f)

src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index 22dfa538b19d9a81004936a1e1104f129fb05f8b..b6c3cf694c3f9b2bbb5e22bad0804c2bd96fc1a1 100644 (file)
@@ -47,7 +47,8 @@ namespace crimson::net {
 
 IOHandler::IOHandler(ChainedDispatchers &dispatchers,
                      SocketConnection &conn)
-  : sid(seastar::this_shard_id()),
+  : shard_states(shard_states_t::create(
+        seastar::this_shard_id(), io_state_t::none)),
     dispatchers(dispatchers),
     conn(conn),
     conn_ref(conn.get_local_shared_foreign_from_this())
@@ -56,8 +57,7 @@ IOHandler::IOHandler(ChainedDispatchers &dispatchers,
 IOHandler::~IOHandler()
 {
   // close_io() must be finished
-  ceph_assert(gate.is_closed());
-  assert(!out_exit_dispatching);
+  ceph_assert_always(shard_states->assert_closed_and_exit());
   assert(!conn_ref);
 }
 
@@ -126,11 +126,11 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
 
 seastar::future<> IOHandler::send(MessageFRef msg)
 {
-  if (seastar::this_shard_id() == sid) {
+  if (seastar::this_shard_id() == get_shard_id()) {
     return do_send(std::move(msg));
   } else {
     return seastar::smp::submit_to(
-        sid, [this, msg=std::move(msg)]() mutable {
+        get_shard_id(), [this, msg=std::move(msg)]() mutable {
       return do_send(std::move(msg));
     });
   }
@@ -138,8 +138,8 @@ seastar::future<> IOHandler::send(MessageFRef msg)
 
 seastar::future<> IOHandler::do_send(MessageFRef msg)
 {
-  assert(seastar::this_shard_id() == sid);
-  if (io_state != io_state_t::drop) {
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (get_io_state() != io_state_t::drop) {
     out_pending_msgs.push_back(std::move(msg));
     notify_out_dispatch();
   }
@@ -148,11 +148,11 @@ seastar::future<> IOHandler::do_send(MessageFRef msg)
 
 seastar::future<> IOHandler::send_keepalive()
 {
-  if (seastar::this_shard_id() == sid) {
+  if (seastar::this_shard_id() == get_shard_id()) {
     return do_send_keepalive();
   } else {
     return seastar::smp::submit_to(
-        sid, [this] {
+        get_shard_id(), [this] {
       return do_send_keepalive();
     });
   }
@@ -160,7 +160,7 @@ seastar::future<> IOHandler::send_keepalive()
 
 seastar::future<> IOHandler::do_send_keepalive()
 {
-  assert(seastar::this_shard_id() == sid);
+  assert(seastar::this_shard_id() == get_shard_id());
   if (!need_keepalive) {
     need_keepalive = true;
     notify_out_dispatch();
@@ -170,10 +170,10 @@ seastar::future<> IOHandler::do_send_keepalive()
 
 void IOHandler::mark_down()
 {
-  ceph_assert_always(seastar::this_shard_id() == sid);
-  ceph_assert_always(io_state != io_state_t::none);
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  ceph_assert_always(get_io_state() != io_state_t::none);
   need_dispatch_reset = false;
-  if (io_state == io_state_t::drop) {
+  if (get_io_state() == io_state_t::drop) {
     return;
   }
 
@@ -185,8 +185,9 @@ void IOHandler::mark_down()
 
 void IOHandler::print_io_stat(std::ostream &out) const
 {
+  assert(seastar::this_shard_id() == get_shard_id());
   out << "io_stat("
-      << "io_state=" << fmt::format("{}", io_state)
+      << "io_state=" << fmt::format("{}", get_io_state())
       << ", in_seq=" << in_seq
       << ", out_seq=" << out_seq
       << ", out_pending_msgs_size=" << out_pending_msgs.size()
@@ -202,7 +203,8 @@ void IOHandler::set_io_state(
     FrameAssemblerV2Ref fa,
     bool set_notify_out)
 {
-  auto prv_state = io_state;
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  auto prv_state = get_io_state();
   ceph_assert_always(!(
     (new_state == io_state_t::none && prv_state != io_state_t::none) ||
     (new_state == io_state_t::open && prv_state == io_state_t::open) ||
@@ -232,10 +234,6 @@ void IOHandler::set_io_state(
     assert(fa == nullptr);
     ceph_assert_always(frame_assembler->is_socket_valid());
     frame_assembler->shutdown_socket<false>(nullptr);
-    if (out_dispatching) {
-      ceph_assert_always(!out_exit_dispatching.has_value());
-      out_exit_dispatching = seastar::promise<>();
-    }
   } else {
     assert(fa == nullptr);
   }
@@ -252,9 +250,7 @@ void IOHandler::set_io_state(
 
   // FIXME: simplify and drop the prv_state == new_state case
   if (prv_state != new_state) {
-    io_state = new_state;
-    io_state_changed.set_value();
-    io_state_changed = seastar::promise<>();
+    shard_states->set_io_state(new_state);
   }
 
   /*
@@ -269,25 +265,11 @@ void IOHandler::set_io_state(
 seastar::future<IOHandler::exit_dispatching_ret>
 IOHandler::wait_io_exit_dispatching()
 {
-  ceph_assert_always(io_state != io_state_t::open);
+  ceph_assert_always(get_io_state() != io_state_t::open);
   ceph_assert_always(frame_assembler != nullptr);
   ceph_assert_always(!frame_assembler->is_socket_valid());
-  return seastar::when_all(
-    [this] {
-      if (out_exit_dispatching) {
-        return out_exit_dispatching->get_future();
-      } else {
-        return seastar::now();
-      }
-    }(),
-    [this] {
-      if (in_exit_dispatching) {
-        return in_exit_dispatching->get_future();
-      } else {
-        return seastar::now();
-      }
-    }()
-  ).discard_result().then([this] {
+  return shard_states->wait_io_exit_dispatching(
+  ).then([this] {
     ceph_assert_always(frame_assembler != nullptr);
     ceph_assert_always(!frame_assembler->is_socket_valid());
     return exit_dispatching_ret{
@@ -298,7 +280,7 @@ IOHandler::wait_io_exit_dispatching()
 
 void IOHandler::reset_session(bool full)
 {
-  assert(io_state != io_state_t::open);
+  assert(get_io_state() != io_state_t::open);
   reset_in();
   if (full) {
     reset_out();
@@ -308,7 +290,7 @@ void IOHandler::reset_session(bool full)
 
 void IOHandler::reset_peer_state()
 {
-  assert(io_state != io_state_t::open);
+  assert(get_io_state() != io_state_t::open);
   reset_in();
   requeue_out_sent_up_to(0);
   discard_out_sent();
@@ -316,7 +298,7 @@ void IOHandler::reset_peer_state()
 
 void IOHandler::requeue_out_sent()
 {
-  assert(io_state != io_state_t::open);
+  assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty()) {
     return;
   }
@@ -338,7 +320,7 @@ void IOHandler::requeue_out_sent()
 
 void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
 {
-  assert(io_state != io_state_t::open);
+  assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
     logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
                    conn, out_seq, seq);
@@ -360,13 +342,13 @@ void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
 
 void IOHandler::reset_in()
 {
-  assert(io_state != io_state_t::open);
+  assert(get_io_state() != io_state_t::open);
   in_seq = 0;
 }
 
 void IOHandler::reset_out()
 {
-  assert(io_state != io_state_t::open);
+  assert(get_io_state() != io_state_t::open);
   discard_out_sent();
   out_pending_msgs.clear();
   need_keepalive = false;
@@ -376,14 +358,14 @@ void IOHandler::reset_out()
 
 void IOHandler::discard_out_sent()
 {
-  assert(io_state != io_state_t::open);
+  assert(get_io_state() != io_state_t::open);
   out_seq = 0;
   out_sent_msgs.clear();
 }
 
 void IOHandler::dispatch_accept()
 {
-  if (io_state == io_state_t::drop) {
+  if (get_io_state() == io_state_t::drop) {
     return;
   }
   // protocol_is_connected can be from true to true here if the replacing is
@@ -395,7 +377,7 @@ void IOHandler::dispatch_accept()
 
 void IOHandler::dispatch_connect()
 {
-  if (io_state == io_state_t::drop) {
+  if (get_io_state() == io_state_t::drop) {
     return;
   }
   ceph_assert_always(protocol_is_connected == false);
@@ -406,7 +388,7 @@ void IOHandler::dispatch_connect()
 
 void IOHandler::dispatch_reset(bool is_replace)
 {
-  ceph_assert_always(io_state == io_state_t::drop);
+  ceph_assert_always(get_io_state() == io_state_t::drop);
   if (!need_dispatch_reset) {
     return;
   }
@@ -417,7 +399,7 @@ void IOHandler::dispatch_reset(bool is_replace)
 
 void IOHandler::dispatch_remote_reset()
 {
-  if (io_state == io_state_t::drop) {
+  if (get_io_state() == io_state_t::drop) {
     return;
   }
   ceph_assert_always(conn_ref);
@@ -441,26 +423,18 @@ void IOHandler::ack_out_sent(seq_num_t seq)
 seastar::future<> IOHandler::do_out_dispatch()
 {
   return seastar::repeat([this] {
-    switch (io_state) {
+    switch (get_io_state()) {
      case io_state_t::open: {
       if (unlikely(!is_out_queued())) {
         // try exit open dispatching
         return frame_assembler->flush<false>(
         ).then([this] {
-          if (io_state != io_state_t::open || is_out_queued()) {
+          if (get_io_state() != io_state_t::open || is_out_queued()) {
             return seastar::make_ready_future<stop_t>(stop_t::no);
           }
           // still nothing pending to send after flush,
           // open dispatching can ONLY stop now
-          ceph_assert(out_dispatching);
-          out_dispatching = false;
-          if (unlikely(out_exit_dispatching.has_value())) {
-            out_exit_dispatching->set_value();
-            out_exit_dispatching = std::nullopt;
-            logger().info("{} do_out_dispatch: nothing queued at {},"
-                          " set out_exit_dispatching",
-                          conn, io_state);
-          }
+          shard_states->exit_out_dispatching("exit-open", conn);
           return seastar::make_ready_future<stop_t>(stop_t::yes);
         });
       }
@@ -471,7 +445,7 @@ seastar::future<> IOHandler::do_out_dispatch()
         sweep_out_pending_msgs_to_sent(
           need_keepalive, next_keepalive_ack, to_ack > 0)
       ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
-        if (io_state != io_state_t::open) {
+        if (get_io_state() != io_state_t::open) {
           return frame_assembler->flush<false>(
           ).then([] {
             return seastar::make_ready_future<stop_t>(stop_t::no);
@@ -492,30 +466,17 @@ seastar::future<> IOHandler::do_out_dispatch()
      }
      case io_state_t::delay:
       // delay out dispatching until open
-      if (out_exit_dispatching) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
-      } else {
-        logger().info("{} do_out_dispatch: delay ...", conn);
-      }
-      return io_state_changed.get_future(
+      shard_states->notify_out_dispatching_stopped("delay...", conn);
+      return shard_states->wait_state_change(
       ).then([] { return stop_t::no; });
      case io_state_t::drop:
-      ceph_assert(out_dispatching);
-      out_dispatching = false;
-      if (out_exit_dispatching) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
-      } else {
-        logger().info("{} do_out_dispatch: dropped", conn);
-      }
+      shard_states->exit_out_dispatching("dropped", conn);
       return seastar::make_ready_future<stop_t>(stop_t::yes);
      default:
       ceph_abort("impossible");
     }
   }).handle_exception_type([this](const std::system_error& e) {
+    auto io_state = get_io_state();
     if (e.code() != std::errc::broken_pipe &&
         e.code() != std::errc::connection_reset &&
         e.code() != error::negotiation_failure) {
@@ -559,26 +520,11 @@ void IOHandler::notify_out_dispatch()
   if (need_notify_out) {
     handshake_listener->notify_out();
   }
-  if (out_dispatching) {
-    // already dispatching
-    return;
-  }
-
-  switch (io_state) {
-  case io_state_t::open:
-    [[fallthrough]];
-  case io_state_t::delay:
-    out_dispatching = true;
-    assert(!gate.is_closed());
-    gate.dispatch_in_background("do_out_dispatch", conn, [this] {
+  if (shard_states->try_enter_out_dispatching()) {
+    shard_states->dispatch_in_background(
+        "do_out_dispatch", conn, [this] {
       return do_out_dispatch();
     });
-    return;
-  case io_state_t::drop:
-    // do not dispatch out
-    return;
-  default:
-    ceph_abort("impossible");
   }
 }
 
@@ -589,9 +535,9 @@ IOHandler::read_message(
 {
   return frame_assembler->read_frame_payload<false>(
   ).then([this, throttle_stamp, msg_size](auto payload) {
-    if (unlikely(io_state != io_state_t::open)) {
+    if (unlikely(get_io_state() != io_state_t::open)) {
       logger().debug("{} triggered {} during read_message()",
-                     conn, io_state);
+                     conn, get_io_state());
       abort_protocol();
     }
 
@@ -693,7 +639,7 @@ IOHandler::read_message(
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
-    assert(io_state == io_state_t::open);
+    assert(get_io_state() == io_state_t::open);
     ceph_assert_always(conn_ref);
     // throttle the reading process by the returned future
     return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
@@ -702,9 +648,8 @@ IOHandler::read_message(
 
 void IOHandler::do_in_dispatch()
 {
-  ceph_assert_always(!in_exit_dispatching.has_value());
-  in_exit_dispatching = seastar::promise<>();
-  gate.dispatch_in_background(
+  shard_states->enter_in_dispatching();
+  shard_states->dispatch_in_background(
       "do_in_dispatch", conn, [this] {
     return seastar::keep_doing([this] {
       return frame_assembler->read_main_preamble<false>(
@@ -786,6 +731,7 @@ void IOHandler::do_in_dispatch()
         e_what = e.what();
       }
 
+      auto io_state = get_io_state();
       if (io_state == io_state_t::open) {
         logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
                       conn, io_state, e_what);
@@ -798,9 +744,7 @@ void IOHandler::do_in_dispatch()
                       conn, io_state, e_what);
       }
     }).finally([this] {
-      ceph_assert_always(in_exit_dispatching.has_value());
-      in_exit_dispatching->set_value();
-      in_exit_dispatching = std::nullopt;
+      shard_states->exit_in_dispatching();
     });
   });
 }
@@ -808,7 +752,7 @@ void IOHandler::do_in_dispatch()
 seastar::future<>
 IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
 {
-  ceph_assert_always(io_state == io_state_t::drop);
+  ceph_assert_always(get_io_state() == io_state_t::drop);
 
   if (is_dispatch_reset) {
     dispatch_reset(is_replace);
@@ -817,8 +761,56 @@ IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
   ceph_assert_always(conn_ref);
   conn_ref.reset();
 
+  return shard_states->close(
+  ).then([this] {
+    assert(shard_states->assert_closed_and_exit());
+  });
+}
+
+/*
+ * IOHandler::shard_states_t
+ */
+
+void
+IOHandler::shard_states_t::notify_out_dispatching_stopped(
+    const char *what, SocketConnection &conn)
+{
+  assert(seastar::this_shard_id() == sid);
+  if (unlikely(out_exit_dispatching.has_value())) {
+    out_exit_dispatching->set_value();
+    out_exit_dispatching = std::nullopt;
+    logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
+                  conn, what, io_state);
+  } else {
+    if (unlikely(io_state != io_state_t::open)) {
+      logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
+                    conn, what, io_state);
+    }
+  }
+}
+
+seastar::future<>
+IOHandler::shard_states_t::wait_io_exit_dispatching()
+{
+  assert(seastar::this_shard_id() == sid);
+  assert(io_state != io_state_t::open);
   assert(!gate.is_closed());
-  return gate.close();
+  return seastar::when_all(
+    [this] {
+      if (out_exit_dispatching) {
+        return out_exit_dispatching->get_future();
+      } else {
+        return seastar::now();
+      }
+    }(),
+    [this] {
+      if (in_exit_dispatching) {
+        return in_exit_dispatching->get_future();
+      } else {
+        return seastar::now();
+      }
+    }()
+  ).discard_result();
 }
 
 } // namespace crimson::net
index f3220d994ba9de412581fb3abb1375f63034c4ff..76e0cc010bc87a8a59ac0ed72e8652495b4094d1 100644 (file)
@@ -110,11 +110,11 @@ public:
  */
 private:
   seastar::shard_id get_shard_id() const final {
-    return sid;
+    return shard_states->get_shard_id();
   }
 
   bool is_connected() const final {
-    ceph_assert_always(seastar::this_shard_id() == sid);
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     return protocol_is_connected;
   }
 
@@ -123,17 +123,17 @@ private:
   seastar::future<> send_keepalive() final;
 
   clock_t::time_point get_last_keepalive() const final {
-    ceph_assert_always(seastar::this_shard_id() == sid);
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     return last_keepalive;
   }
 
   clock_t::time_point get_last_keepalive_ack() const final {
-    ceph_assert_always(seastar::this_shard_id() == sid);
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     return last_keepalive_ack;
   }
 
   void set_last_keepalive_ack(clock_t::time_point when) final {
-    ceph_assert_always(seastar::this_shard_id() == sid);
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     last_keepalive_ack = when;
   }
 
@@ -199,6 +199,138 @@ public:
   void dispatch_connect();
 
  private:
+  class shard_states_t;
+  using shard_states_ref_t = std::unique_ptr<shard_states_t>;
+
+  class shard_states_t {
+  public:
+    shard_states_t(seastar::shard_id _sid, io_state_t state)
+      : sid{_sid}, io_state{state} {}
+
+    seastar::shard_id get_shard_id() const {
+      return sid;
+    }
+
+    io_state_t get_io_state() const {
+      assert(seastar::this_shard_id() == sid);
+      return io_state;
+    }
+
+    void set_io_state(io_state_t new_state) {
+      assert(seastar::this_shard_id() == sid);
+      assert(io_state != new_state);
+      pr_io_state_changed.set_value();
+      pr_io_state_changed = seastar::promise<>();
+      if (io_state == io_state_t::open) {
+        // from open
+        if (out_dispatching) {
+          ceph_assert_always(!out_exit_dispatching.has_value());
+          out_exit_dispatching = seastar::promise<>();
+        }
+      }
+      io_state = new_state;
+    }
+
+    seastar::future<> wait_state_change() {
+      assert(seastar::this_shard_id() == sid);
+      return pr_io_state_changed.get_future();
+    }
+
+    template <typename Func>
+    void dispatch_in_background(
+        const char *what, SocketConnection &who, Func &&func) {
+      assert(seastar::this_shard_id() == sid);
+      ceph_assert_always(!gate.is_closed());
+      gate.dispatch_in_background(what, who, std::move(func));
+    }
+
+    void enter_in_dispatching() {
+      assert(seastar::this_shard_id() == sid);
+      assert(io_state == io_state_t::open);
+      ceph_assert_always(!in_exit_dispatching.has_value());
+      in_exit_dispatching = seastar::promise<>();
+    }
+
+    void exit_in_dispatching() {
+      assert(seastar::this_shard_id() == sid);
+      assert(io_state != io_state_t::open);
+      ceph_assert_always(in_exit_dispatching.has_value());
+      in_exit_dispatching->set_value();
+      in_exit_dispatching = std::nullopt;
+    }
+
+    bool try_enter_out_dispatching() {
+      assert(seastar::this_shard_id() == sid);
+      if (out_dispatching) {
+        // already dispatching out
+        return false;
+      }
+      switch (io_state) {
+      case io_state_t::open:
+        [[fallthrough]];
+      case io_state_t::delay:
+        out_dispatching = true;
+        return true;
+      case io_state_t::drop:
+        // do not dispatch out
+        return false;
+      default:
+        ceph_abort("impossible");
+      }
+    }
+
+    void notify_out_dispatching_stopped(
+        const char *what, SocketConnection &conn);
+
+    void exit_out_dispatching(
+        const char *what, SocketConnection &conn) {
+      assert(seastar::this_shard_id() == sid);
+      ceph_assert_always(out_dispatching);
+      out_dispatching = false;
+      notify_out_dispatching_stopped(what, conn);
+    }
+
+    seastar::future<> wait_io_exit_dispatching();
+
+    seastar::future<> close() {
+      assert(seastar::this_shard_id() == sid);
+      assert(!gate.is_closed());
+      return gate.close();
+    }
+
+    bool assert_closed_and_exit() const {
+      assert(seastar::this_shard_id() == sid);
+      if (gate.is_closed()) {
+        ceph_assert_always(io_state == io_state_t::drop);
+        ceph_assert_always(!out_dispatching);
+        ceph_assert_always(!out_exit_dispatching);
+        ceph_assert_always(!in_exit_dispatching);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    static shard_states_ref_t create(
+        seastar::shard_id sid, io_state_t state) {
+      return std::make_unique<shard_states_t>(sid, state);
+    }
+
+  private:
+    const seastar::shard_id sid;
+    io_state_t io_state;
+
+    crimson::common::Gated gate;
+    seastar::promise<> pr_io_state_changed;
+    bool out_dispatching = false;
+    std::optional<seastar::promise<>> out_exit_dispatching;
+    std::optional<seastar::promise<>> in_exit_dispatching;
+  };
+
+  io_state_t get_io_state() const {
+    return shard_states->get_io_state();
+  }
+
   seastar::future<> do_send(MessageFRef msg);
 
   seastar::future<> do_send_keepalive();
@@ -244,7 +376,7 @@ public:
   void do_in_dispatch();
 
 private:
-  seastar::shard_id sid;
+  shard_states_ref_t shard_states;
 
   ChainedDispatchers &dispatchers;
 
@@ -255,27 +387,16 @@ private:
 
   HandshakeListener *handshake_listener = nullptr;
 
-  crimson::common::Gated gate;
-
   FrameAssemblerV2Ref frame_assembler;
 
   bool protocol_is_connected = false;
 
   bool need_dispatch_reset = true;
 
-  io_state_t io_state = io_state_t::none;
-
-  // wait until current io_state changed
-  seastar::promise<> io_state_changed;
-
   /*
    * out states for writing
    */
 
-  bool out_dispatching = false;
-
-  std::optional<seastar::promise<>> out_exit_dispatching;
-
   /// the seq num of the last transmitted message
   seq_num_t out_seq = 0;
 
@@ -297,8 +418,6 @@ private:
    * in states for reading
    */
 
-  std::optional<seastar::promise<>> in_exit_dispatching;
-
   /// the seq num of the last received message
   seq_num_t in_seq = 0;