]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: make it explict about the FrameAssemberV2 ownership
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 2 Dec 2022 08:28:36 +0000 (16:28 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
FrameAssemblerV2 is owned by ProtocolV2 during handshake, and owned by
Protocol during messaging.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 6165e2e57dccfaf2ceb6bacedd93acd53700b4ca..1b38263335a6a8a78ca346f8e6cbcfcb3e0d3e7b 100644 (file)
@@ -289,4 +289,9 @@ void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
                  (int)main_preamble->num_segments, main_preamble->crc);
 }
 
+FrameAssemblerV2Ref FrameAssemblerV2::create(SocketConnection &conn)
+{
+  return std::make_unique<FrameAssemblerV2>(conn);
+}
+
 } // namespace crimson::net
index 3165a048fc19dc4293f41998aeb927a36bc64152..06c5cb25eee05fbc77affe2b45ab6e4abe412f67 100644 (file)
@@ -12,6 +12,8 @@
 namespace crimson::net {
 
 class SocketConnection;
+class FrameAssemblerV2;
+using FrameAssemblerV2Ref = std::unique_ptr<FrameAssemblerV2>;
 
 class FrameAssemblerV2 {
 public:
@@ -120,6 +122,8 @@ public:
     return write_flush(std::move(bl));
   }
 
+  static FrameAssemblerV2Ref create(SocketConnection &conn);
+
 private:
   bool has_socket() const;
 
index 6bdcf2db5c293e2396ca547e309eec6a24058882..e79795a7cd52ebf5fff6ab3d87de37945085e455 100644 (file)
@@ -48,8 +48,7 @@ namespace crimson::net {
 Protocol::Protocol(ChainedDispatchers& dispatchers,
                    SocketConnection& conn)
   : dispatchers(dispatchers),
-    conn(conn),
-    frame_assembler(conn)
+    conn(conn)
 {}
 
 Protocol::~Protocol()
@@ -68,17 +67,17 @@ ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
 
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(frame_assembler.get_buffer(keepalive_frame));
+    bl.append(frame_assembler->get_buffer(keepalive_frame));
   }
 
   if (unlikely(maybe_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
-    bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
+    bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
   }
 
   if (require_ack && num_msgs == 0u) {
     auto ack_frame = AckFrame::Encode(get_in_seq());
-    bl.append(frame_assembler.get_buffer(ack_frame));
+    bl.append(frame_assembler->get_buffer(ack_frame));
   }
 
   std::for_each(
@@ -108,7 +107,7 @@ ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
         msg->get_payload(), msg->get_middle(), msg->get_data());
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(frame_assembler.get_buffer(message));
+    bl.append(frame_assembler->get_buffer(message));
   });
 
   if (!conn.policy.lossy) {
@@ -140,7 +139,8 @@ seastar::future<> Protocol::send_keepalive()
 }
 
 void Protocol::set_out_state(
-    const Protocol::out_state_t &new_state)
+    const Protocol::out_state_t &new_state,
+    FrameAssemblerV2Ref fa)
 {
   ceph_assert_always(!(
     (new_state == out_state_t::none && out_state != out_state_t::none) ||
@@ -149,25 +149,29 @@ void Protocol::set_out_state(
   ));
 
   bool dispatch_in = false;
-  if (out_state != out_state_t::open &&
-      new_state == out_state_t::open) {
+  if (new_state == out_state_t::open) {
     // to open
-    ceph_assert_always(frame_assembler.is_socket_valid());
+    assert(fa != nullptr);
+    ceph_assert_always(frame_assembler == nullptr);
+    frame_assembler = std::move(fa);
+    ceph_assert_always(frame_assembler->is_socket_valid());
     dispatch_in = true;
 #ifdef UNIT_TESTS_BUILT
     if (conn.interceptor) {
       conn.interceptor->register_conn_ready(conn);
     }
 #endif
-  } else if (out_state == out_state_t::open &&
-             new_state != out_state_t::open) {
+  } else if (out_state == out_state_t::open) {
     // from open
-    ceph_assert_always(frame_assembler.is_socket_valid());
-    frame_assembler.shutdown_socket();
+    assert(fa == nullptr);
+    ceph_assert_always(frame_assembler->is_socket_valid());
+    frame_assembler->shutdown_socket();
     if (out_dispatching) {
       ceph_assert_always(!out_exit_dispatching.has_value());
-      out_exit_dispatching = seastar::shared_promise<>();
+      out_exit_dispatching = seastar::promise<>();
     }
+  } else {
+    assert(fa == nullptr);
   }
 
   if (out_state != new_state) {
@@ -176,32 +180,38 @@ void Protocol::set_out_state(
     out_state_changed = seastar::promise<>();
   }
 
-  // The above needs to be atomic
+  /*
+   * not atomic below
+   */
+
   if (dispatch_in) {
     do_in_dispatch();
   }
 }
 
-seastar::future<> Protocol::wait_io_exit_dispatching()
+seastar::future<FrameAssemblerV2Ref> Protocol::wait_io_exit_dispatching()
 {
   ceph_assert_always(out_state != out_state_t::open);
-  ceph_assert_always(!frame_assembler.is_socket_valid());
+  ceph_assert_always(frame_assembler != nullptr);
+  ceph_assert_always(!frame_assembler->is_socket_valid());
   return seastar::when_all(
     [this] {
       if (out_exit_dispatching) {
-        return out_exit_dispatching->get_shared_future();
+        return out_exit_dispatching->get_future();
       } else {
         return seastar::now();
       }
     }(),
     [this] {
       if (in_exit_dispatching) {
-        return in_exit_dispatching->get_shared_future();
+        return in_exit_dispatching->get_future();
       } else {
         return seastar::now();
       }
     }()
-  ).discard_result();
+  ).discard_result().then([this] {
+    return std::move(frame_assembler);
+  });
 }
 
 void Protocol::requeue_out_sent()
@@ -275,7 +285,8 @@ void Protocol::ack_out_sent(seq_num_t seq)
 
 seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
   assert(!is_out_queued());
-  return frame_assembler.flush().then([this] {
+  return frame_assembler->flush(
+  ).then([this] {
     if (!is_out_queued()) {
       // still nothing pending to send after flush,
       // the dispatching can ONLY stop now
@@ -308,7 +319,7 @@ seastar::future<> Protocol::do_out_dispatch()
       auto to_ack = ack_left;
       assert(to_ack == 0 || in_seq > 0);
       // sweep all pending out with the concrete Protocol
-      return frame_assembler.write(
+      return frame_assembler->write(
         sweep_out_pending_msgs_to_sent(
           need_keepalive, next_keepalive_ack, to_ack > 0)
       ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
@@ -408,7 +419,7 @@ void Protocol::notify_out_dispatch()
 seastar::future<>
 Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
 {
-  return frame_assembler.read_frame_payload(
+  return frame_assembler->read_frame_payload(
   ).then([this, throttle_stamp, msg_size](auto payload) {
     if (unlikely(out_state != out_state_t::open)) {
       logger().debug("{} triggered {} during read_message()",
@@ -520,10 +531,10 @@ Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
 void Protocol::do_in_dispatch()
 {
   ceph_assert_always(!in_exit_dispatching.has_value());
-  in_exit_dispatching = seastar::shared_promise<>();
+  in_exit_dispatching = seastar::promise<>();
   gate.dispatch_in_background("do_in_dispatch", *this, [this] {
     return seastar::keep_doing([this] {
-      return frame_assembler.read_main_preamble(
+      return frame_assembler->read_main_preamble(
       ).then([this](auto ret) {
         switch (ret.tag) {
           case Tag::MESSAGE: {
@@ -556,7 +567,7 @@ void Protocol::do_in_dispatch()
             });
           }
           case Tag::ACK:
-            return frame_assembler.read_frame_payload(
+            return frame_assembler->read_frame_payload(
             ).then([this](auto payload) {
               // handle_message_ack() logic
               auto ack = AckFrame::Decode(payload->back());
@@ -564,7 +575,7 @@ void Protocol::do_in_dispatch()
               ack_out_sent(ack.seq());
             });
           case Tag::KEEPALIVE2:
-            return frame_assembler.read_frame_payload(
+            return frame_assembler->read_frame_payload(
             ).then([this](auto payload) {
               // handle_keepalive2() logic
               auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
@@ -577,7 +588,7 @@ void Protocol::do_in_dispatch()
               last_keepalive = seastar::lowres_system_clock::now();
             });
           case Tag::KEEPALIVE2_ACK:
-            return frame_assembler.read_frame_payload(
+            return frame_assembler->read_frame_payload(
             ).then([this](auto payload) {
               // handle_keepalive2_ack() logic
               auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
index 819c7aa95048a4199daf43d430b6ccf3afa94a68..f1e7d2bb0dad30116732e7c93c9b68a2d0b1c439 100644 (file)
@@ -105,9 +105,9 @@ class Protocol {
   };
   friend class fmt::formatter<out_state_t>;
 
-  void set_out_state(const out_state_t &new_state);
+  void set_out_state(const out_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
 
-  seastar::future<> wait_io_exit_dispatching();
+  seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
 
   void requeue_out_sent_up_to(seq_num_t seq);
 
@@ -131,8 +131,6 @@ class Protocol {
 
   SocketConnection &conn;
 
-  FrameAssemblerV2 frame_assembler;
-
  private:
   bool is_out_queued() const {
     return (!out_pending_msgs.empty() ||
@@ -160,6 +158,8 @@ class Protocol {
 
   crimson::common::Gated gate;
 
+  FrameAssemblerV2Ref frame_assembler;
+
   /*
    * out states for writing
    */
@@ -171,10 +171,7 @@ class Protocol {
 
   bool out_dispatching = false;
 
-  // If another continuation is trying to close or replace socket when
-  // out_dispatching is true and out_state is open, it needs to wait for
-  // out_exit_dispatching until writing is stopped or failed.
-  std::optional<seastar::shared_promise<>> out_exit_dispatching;
+  std::optional<seastar::promise<>> out_exit_dispatching;
 
   /// the seq num of the last transmitted message
   seq_num_t out_seq = 0;
@@ -195,7 +192,7 @@ class Protocol {
    * in states for reading
    */
 
-  std::optional<seastar::shared_promise<>> in_exit_dispatching;
+  std::optional<seastar::promise<>> in_exit_dispatching;
 
   /// the seq num of the last received message
   seq_num_t in_seq = 0;
index 2337ecfbc38b5de32cb9851bb9c7da7410d4cb3c..8c3c2e54806a3ca74e98da00333f1ae484cea561 100644 (file)
@@ -160,6 +160,7 @@ ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
                        SocketMessenger& messenger)
   : Protocol(dispatchers, conn),
     messenger{messenger},
+    frame_assembler{FrameAssemblerV2::create(conn)},
     auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
     protocol_timer{conn}
 {}
@@ -198,7 +199,7 @@ void ProtocolV2::start_accept(SocketRef&& new_socket,
   ceph_assert(state == state_t::NONE);
   // until we know better
   conn.target_addr = _peer_addr;
-  frame_assembler.set_socket(std::move(new_socket));
+  frame_assembler->set_socket(std::move(new_socket));
   has_socket = true;
   is_socket_valid = true;
   logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
@@ -207,22 +208,49 @@ void ProtocolV2::start_accept(SocketRef&& new_socket,
   execute_accepting();
 }
 
-void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t new_state, out_state_t _out_state, bool reentrant)
 {
-  if (!reentrant && _state == state) {
+  if (!reentrant && new_state == state) {
     logger().error("{} is not allowed to re-trigger state {}",
                    conn, get_state_name(state));
     ceph_abort();
   }
   if (state == state_t::CLOSING) {
     logger().error("{} CLOSING is not allowed to trigger state {}",
-                   conn, get_state_name(_state));
+                   conn, get_state_name(new_state));
     ceph_abort();
   }
   logger().debug("{} TRIGGER {}, was {}",
-                 conn, get_state_name(_state), get_state_name(state));
-  state = _state;
-  set_out_state(_out_state);
+                 conn, get_state_name(new_state), get_state_name(state));
+  auto pre_state = state;
+  if (pre_state == state_t::READY) {
+    assert(!gate.is_closed());
+    ceph_assert_always(!exit_io.has_value());
+    exit_io = seastar::shared_promise<>();
+  }
+  state = new_state;
+  if (new_state == state_t::READY) {
+    // I'm not responsible to shutdown the socket at READY
+    is_socket_valid = false;
+    set_out_state(_out_state, std::move(frame_assembler));
+  } else {
+    set_out_state(_out_state, nullptr);
+  }
+
+  /*
+   * not atomic below
+   */
+
+  if (pre_state == state_t::READY) {
+    gate.dispatch_in_background("exit_io", *this, [this] {
+      return wait_io_exit_dispatching(
+      ).then([this](FrameAssemblerV2Ref fa) {
+        frame_assembler = std::move(fa);
+        exit_io->set_value();
+        exit_io = std::nullopt;
+      });
+    });
+  }
 }
 
 void ProtocolV2::fault(
@@ -276,7 +304,7 @@ void ProtocolV2::fault(
   if (likely(has_socket)) {
     if (likely(is_socket_valid)) {
       ceph_assert_always(state != state_t::READY);
-      frame_assembler.shutdown_socket();
+      frame_assembler->shutdown_socket();
       is_socket_valid = false;
     } else {
       ceph_assert_always(state != state_t::ESTABLISHING);
@@ -360,11 +388,11 @@ ProtocolV2::banner_exchange(bool is_connect)
                  CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
   INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
-  return frame_assembler.write_flush(std::move(bl)).then([this] {
+  return frame_assembler->write_flush(std::move(bl)).then([this] {
       // 2. read peer banner
       unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
-      return frame_assembler.read_exactly(banner_len); // or read exactly?
+      return frame_assembler->read_exactly(banner_len); // or read exactly?
     }).then([this] (auto bl) {
       // 3. process peer banner and read banner_payload
       unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
@@ -394,7 +422,7 @@ ProtocolV2::banner_exchange(bool is_connect)
       }
       logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
-      return frame_assembler.read(payload_len);
+      return frame_assembler->read(payload_len);
     }).then([this, is_connect] (bufferlist bl) {
       // 4. process peer banner_payload and send HelloFrame
       auto p = bl.cbegin();
@@ -428,20 +456,20 @@ ProtocolV2::banner_exchange(bool is_connect)
       }
       peer_supported_features = _peer_supported_features;
       bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-      frame_assembler.set_is_rev1(is_rev1);
+      frame_assembler->set_is_rev1(is_rev1);
 
       auto hello = HelloFrame::Encode(messenger.get_mytype(),
                                       conn.target_addr);
       logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
                      conn, ceph_entity_type_name(messenger.get_mytype()),
                      conn.target_addr);
-      return frame_assembler.write_flush_frame(hello);
+      return frame_assembler->write_flush_frame(hello);
     }).then([this] {
       //5. read peer HelloFrame
-      return frame_assembler.read_main_preamble();
+      return frame_assembler->read_main_preamble();
     }).then([this](auto ret) {
       expect_tag(Tag::HELLO, ret.tag, conn, __func__);
-      return frame_assembler.read_frame_payload();
+      return frame_assembler->read_frame_payload();
     }).then([this](auto payload) {
       // 6. process peer HelloFrame
       auto hello = HelloFrame::Decode(payload->back());
@@ -457,11 +485,11 @@ ProtocolV2::banner_exchange(bool is_connect)
 
 seastar::future<> ProtocolV2::handle_auth_reply()
 {
-  return frame_assembler.read_main_preamble(
+  return frame_assembler->read_main_preamble(
   ).then([this](auto ret) {
     switch (ret.tag) {
       case Tag::AUTH_BAD_METHOD:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_auth_bad_method() logic
           auto bad_method = AuthBadMethodFrame::Decode(payload->back());
@@ -482,7 +510,7 @@ seastar::future<> ProtocolV2::handle_auth_reply()
           return client_auth(bad_method.allowed_methods());
         });
       case Tag::AUTH_REPLY_MORE:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_auth_reply_more() logic
           auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
@@ -495,12 +523,12 @@ seastar::future<> ProtocolV2::handle_auth_reply()
           auto more_reply = AuthRequestMoreFrame::Encode(reply);
           logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
                          conn, reply.length());
-          return frame_assembler.write_flush_frame(more_reply);
+          return frame_assembler->write_flush_frame(more_reply);
         }).then([this] {
           return handle_auth_reply();
         });
       case Tag::AUTH_DONE:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_auth_done() logic
           auto auth_done = AuthDoneFrame::Decode(payload->back());
@@ -520,7 +548,7 @@ seastar::future<> ProtocolV2::handle_auth_reply()
             abort_in_fault();
           }
           auth_meta->con_mode = auth_done.con_mode();
-          frame_assembler.create_session_stream_handlers(*auth_meta, false);
+          frame_assembler->create_session_stream_handlers(*auth_meta, false);
           return finish_auth();
         });
       default: {
@@ -544,7 +572,8 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
     logger().debug("{} WRITE AuthRequestFrame: method={},"
                    " preferred_modes={}, payload_len={}",
                    conn, auth_method, preferred_modes, bl.length());
-    return frame_assembler.write_flush_frame(frame).then([this] {
+    return frame_assembler->write_flush_frame(frame
+    ).then([this] {
       return handle_auth_reply();
     });
   } catch (const crimson::auth::error& e) {
@@ -557,7 +586,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::process_wait()
 {
-  return frame_assembler.read_frame_payload(
+  return frame_assembler->read_frame_payload(
   ).then([this](auto payload) {
     // handle_wait() logic
     logger().debug("{} GOT WaitFrame", conn);
@@ -592,12 +621,13 @@ ProtocolV2::client_connect()
                  conn.policy.features_supported,
                  conn.policy.features_required | msgr2_required,
                  flags, client_cookie);
-  return frame_assembler.write_flush_frame(client_ident).then([this] {
-    return frame_assembler.read_main_preamble();
+  return frame_assembler->write_flush_frame(client_ident
+  ).then([this] {
+    return frame_assembler->read_main_preamble();
   }).then([this](auto ret) {
     switch (ret.tag) {
       case Tag::IDENT_MISSING_FEATURES:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_ident_missing_features() logic
           auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
@@ -610,7 +640,7 @@ ProtocolV2::client_connect()
       case Tag::WAIT:
         return process_wait();
       case Tag::SERVER_IDENT:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_server_ident() logic
           requeue_out_sent();
@@ -695,12 +725,12 @@ ProtocolV2::client_reconnect()
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
                  global_seq, connect_seq, get_in_seq());
-  return frame_assembler.write_flush_frame(reconnect).then([this] {
-    return frame_assembler.read_main_preamble();
+  return frame_assembler->write_flush_frame(reconnect).then([this] {
+    return frame_assembler->read_main_preamble();
   }).then([this](auto ret) {
     switch (ret.tag) {
       case Tag::SESSION_RETRY_GLOBAL:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_session_retry_global() logic
           auto retry = RetryGlobalFrame::Decode(payload->back());
@@ -711,7 +741,7 @@ ProtocolV2::client_reconnect()
           return client_reconnect();
         });
       case Tag::SESSION_RETRY:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_session_retry() logic
           auto retry = RetryFrame::Decode(payload->back());
@@ -722,7 +752,7 @@ ProtocolV2::client_reconnect()
           return client_reconnect();
         });
       case Tag::SESSION_RESET:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} before reset_session()",
@@ -738,7 +768,7 @@ ProtocolV2::client_reconnect()
       case Tag::WAIT:
         return process_wait();
       case Tag::SESSION_RECONNECT_OK:
-        return frame_assembler.read_frame_payload(
+        return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_reconnect_ok() logic
           auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
@@ -771,7 +801,8 @@ void ProtocolV2::execute_connecting()
         assert(server_cookie == 0);
         logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
       }
-      return wait_io_exit_dispatching().then([this] {
+      return wait_exit_io().then([this] {
+          ceph_assert_always(frame_assembler);
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} before Socket::connect()",
                            conn, get_state_name(state));
@@ -789,14 +820,14 @@ void ProtocolV2::execute_connecting()
             });
           }
           if (!has_socket) {
-            frame_assembler.set_socket(std::move(new_socket));
+            frame_assembler->set_socket(std::move(new_socket));
             has_socket = true;
           } else {
             gate.dispatch_in_background(
               "replace_socket_connecting",
               *this,
               [this, new_socket=std::move(new_socket)]() mutable {
-                return frame_assembler.replace_shutdown_socket(std::move(new_socket));
+                return frame_assembler->replace_shutdown_socket(std::move(new_socket));
               }
             );
           }
@@ -804,8 +835,8 @@ void ProtocolV2::execute_connecting()
           return seastar::now();
         }).then([this] {
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          frame_assembler.reset_handlers();
-          frame_assembler.start_recording();
+          frame_assembler->reset_handlers();
+          frame_assembler->start_recording();
           return banner_exchange(true);
         }).then([this] (auto&& ret) {
           auto [_peer_type, _my_addr_from_peer] = std::move(ret);
@@ -820,7 +851,7 @@ void ProtocolV2::execute_connecting()
                            conn, get_state_name(state));
             abort_protocol();
           }
-          frame_assembler.learn_socket_ephemeral_port_as_connector(
+          frame_assembler->learn_socket_ephemeral_port_as_connector(
               _my_addr_from_peer.get_port());
           if (unlikely(_my_addr_from_peer.is_legacy())) {
             logger().warn("{} peer sent a legacy address for me: {}",
@@ -865,7 +896,7 @@ void ProtocolV2::execute_connecting()
            case next_step_t::wait: {
             logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
             ceph_assert_always(is_socket_valid);
-            frame_assembler.shutdown_socket();
+            frame_assembler->shutdown_socket();
             is_socket_valid = false;
             execute_wait(true);
             break;
@@ -894,7 +925,8 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r)
                 "allowed_methods={}, allowed_modes={})",
                 conn, auth_meta->auth_method, cpp_strerror(r),
                 allowed_methods, allowed_modes);
-  return frame_assembler.write_flush_frame(bad_method).then([this] {
+  return frame_assembler->write_flush_frame(bad_method
+  ).then([this] {
     return server_auth();
   });
 }
@@ -920,9 +952,10 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
                    conn, conn.peer_global_id,
                    ceph_con_mode_name(auth_meta->con_mode), reply.length());
-    return frame_assembler.write_flush_frame(auth_done).then([this] {
+    return frame_assembler->write_flush_frame(auth_done
+    ).then([this] {
       ceph_assert(auth_meta);
-      frame_assembler.create_session_stream_handlers(*auth_meta, true);
+      frame_assembler->create_session_stream_handlers(*auth_meta, true);
       return finish_auth();
     });
    }
@@ -931,11 +964,12 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     auto more = AuthReplyMoreFrame::Encode(reply);
     logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
                    conn, reply.length());
-    return frame_assembler.write_flush_frame(more).then([this] {
-      return frame_assembler.read_main_preamble();
+    return frame_assembler->write_flush_frame(more
+    ).then([this] {
+      return frame_assembler->read_main_preamble();
     }).then([this](auto ret) {
       expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__);
-      return frame_assembler.read_frame_payload();
+      return frame_assembler->read_frame_payload();
     }).then([this](auto payload) {
       auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
       logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
@@ -957,10 +991,10 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
 
 seastar::future<> ProtocolV2::server_auth()
 {
-  return frame_assembler.read_main_preamble(
+  return frame_assembler->read_main_preamble(
   ).then([this](auto ret) {
     expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__);
-    return frame_assembler.read_frame_payload();
+    return frame_assembler->read_frame_payload();
   }).then([this](auto payload) {
     // handle_auth_request() logic
     auto request = AuthRequestFrame::Decode(payload->back());
@@ -999,7 +1033,8 @@ ProtocolV2::send_wait()
 {
   auto wait = WaitFrame::Encode();
   logger().debug("{} WRITE WaitFrame", conn);
-  return frame_assembler.write_flush_frame(wait).then([] {
+  return frame_assembler->write_flush_frame(wait
+  ).then([] {
     return next_step_t::wait;
   });
 }
@@ -1017,7 +1052,7 @@ ProtocolV2::reuse_connection(
 
   existing_proto->trigger_replacing(reconnect,
                                     do_reset,
-                                    frame_assembler.to_replace(),
+                                    frame_assembler->to_replace(),
                                     std::move(auth_meta),
                                     peer_global_seq,
                                     client_cookie,
@@ -1161,7 +1196,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::server_connect()
 {
-  return frame_assembler.read_frame_payload(
+  return frame_assembler->read_frame_payload(
   ).then([this](auto payload) {
     // handle_client_ident() logic
     auto client_ident = ClientIdentFrame::Decode(payload->back());
@@ -1213,7 +1248,8 @@ ProtocolV2::server_connect()
       auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
       logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
                     conn, feat_missing);
-      return frame_assembler.write_flush_frame(ident_missing_features).then([] {
+      return frame_assembler->write_flush_frame(ident_missing_features
+      ).then([] {
         return next_step_t::wait;
       });
     }
@@ -1251,7 +1287,7 @@ ProtocolV2::server_connect()
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::read_reconnect()
 {
-  return frame_assembler.read_main_preamble(
+  return frame_assembler->read_main_preamble(
   ).then([this](auto ret) {
     expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect");
     return server_reconnect();
@@ -1263,7 +1299,8 @@ ProtocolV2::send_retry(uint64_t connect_seq)
 {
   auto retry = RetryFrame::Encode(connect_seq);
   logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
-  return frame_assembler.write_flush_frame(retry).then([this] {
+  return frame_assembler->write_flush_frame(retry
+  ).then([this] {
     return read_reconnect();
   });
 }
@@ -1273,7 +1310,8 @@ ProtocolV2::send_retry_global(uint64_t global_seq)
 {
   auto retry = RetryGlobalFrame::Encode(global_seq);
   logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
-  return frame_assembler.write_flush_frame(retry).then([this] {
+  return frame_assembler->write_flush_frame(retry
+  ).then([this] {
     return read_reconnect();
   });
 }
@@ -1283,8 +1321,9 @@ ProtocolV2::send_reset(bool full)
 {
   auto reset = ResetFrame::Encode(full);
   logger().warn("{} WRITE ResetFrame: full={}", conn, full);
-  return frame_assembler.write_flush_frame(reset).then([this] {
-    return frame_assembler.read_main_preamble();
+  return frame_assembler->write_flush_frame(reset
+  ).then([this] {
+    return frame_assembler->read_main_preamble();
   }).then([this](auto ret) {
     expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
     return server_connect();
@@ -1294,7 +1333,7 @@ ProtocolV2::send_reset(bool full)
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::server_reconnect()
 {
-  return frame_assembler.read_frame_payload(
+  return frame_assembler->read_frame_payload(
   ).then([this](auto payload) {
     // handle_reconnect() logic
     auto reconnect = ReconnectFrame::Decode(payload->back());
@@ -1443,8 +1482,8 @@ void ProtocolV2::execute_accepting()
       return seastar::futurize_invoke([this] {
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          frame_assembler.reset_handlers();
-          frame_assembler.start_recording();
+          frame_assembler->reset_handlers();
+          frame_assembler->start_recording();
           return banner_exchange(false);
         }).then([this] (auto&& ret) {
           auto [_peer_type, _my_addr_from_peer] = std::move(ret);
@@ -1468,7 +1507,7 @@ void ProtocolV2::execute_accepting()
           messenger.learned_addr(_my_addr_from_peer, conn);
           return server_auth();
         }).then([this] {
-          return frame_assembler.read_main_preamble();
+          return frame_assembler->read_main_preamble();
         }).then([this](auto ret) {
           switch (ret.tag) {
             case Tag::CLIENT_IDENT:
@@ -1511,16 +1550,17 @@ seastar::future<> ProtocolV2::finish_auth()
 {
   ceph_assert(auth_meta);
 
-  auto records = frame_assembler.stop_recording();
+  auto records = frame_assembler->stop_recording();
   const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
     auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
   auto sig_frame = AuthSignatureFrame::Encode(sig);
   logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
-  return frame_assembler.write_flush_frame(sig_frame).then([this] {
-    return frame_assembler.read_main_preamble();
+  return frame_assembler->write_flush_frame(sig_frame
+  ).then([this] {
+    return frame_assembler->read_main_preamble();
   }).then([this](auto ret) {
     expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
-    return frame_assembler.read_frame_payload();
+    return frame_assembler->read_frame_payload();
   }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
     // handle_auth_signature() logic
     auto sig_frame = AuthSignatureFrame::Decode(payload->back());
@@ -1634,7 +1674,7 @@ ProtocolV2::send_server_ident()
                  conn.policy.features_required | msgr2_required,
                  flags, server_cookie);
 
-  return frame_assembler.write_flush_frame(server_ident);
+  return frame_assembler->write_flush_frame(server_ident);
 }
 
 // REPLACING state
@@ -1655,7 +1695,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
   ceph_assert_always(has_socket);
   ceph_assert_always(!mover.socket->is_shutdown());
   if (is_socket_valid) {
-    frame_assembler.shutdown_socket();
+    frame_assembler->shutdown_socket();
     is_socket_valid = false;
   }
   gate.dispatch_in_background("trigger_replacing", *this,
@@ -1672,8 +1712,9 @@ void ProtocolV2::trigger_replacing(bool reconnect,
     dispatchers.ms_handle_accept(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
     // state may become CLOSING, close mover.socket and abort later
-    return wait_io_exit_dispatching(
+    return wait_exit_io(
     ).then([this] {
+      ceph_assert_always(frame_assembler);
       protocol_timer.cancel();
       auto done = std::move(execution_done);
       execution_done = seastar::now();
@@ -1704,7 +1745,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         "replace_frame_assembler",
         *this,
         [this, mover=std::move(mover)]() mutable {
-          return frame_assembler.replace_by(std::move(mover));
+          return frame_assembler->replace_by(std::move(mover));
         }
       );
       is_socket_valid = true;
@@ -1715,7 +1756,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         requeue_out_sent_up_to(new_msg_seq);
         auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
         logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
-        return frame_assembler.write_flush_frame(reconnect_ok);
+        return frame_assembler->write_flush_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
         assert(conn.get_peer_type() == new_peer_name.type());
@@ -1725,7 +1766,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         conn.set_features(new_conn_features);
         peer_supported_features = new_peer_supported_features;
         bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-        frame_assembler.set_is_rev1(is_rev1);
+        frame_assembler->set_is_rev1(is_rev1);
         return send_server_ident();
       }
     }).then([this, reconnect] {
@@ -1759,8 +1800,6 @@ void ProtocolV2::execute_ready()
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   protocol_timer.cancel();
   ceph_assert_always(is_socket_valid);
-  // I'm not responsible to shutdown the socket at READY
-  is_socket_valid = false;
   trigger_state(state_t::READY, out_state_t::open, false);
 }
 
@@ -1820,7 +1859,8 @@ void ProtocolV2::execute_server_wait()
   ceph_assert_always(is_socket_valid);
   trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
   gated_execute("execute_server_wait", [this] {
-    return frame_assembler.read_exactly(1).then([this] (auto bl) {
+    return frame_assembler->read_exactly(1
+    ).then([this](auto bl) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
       abort_in_fault();
     }).handle_exception([this] (std::exception_ptr eptr) {
@@ -1899,7 +1939,7 @@ void ProtocolV2::do_close(
     (*f_accept_new)();
   }
   if (is_socket_valid) {
-    frame_assembler.shutdown_socket();
+    frame_assembler->shutdown_socket();
     is_socket_valid = false;
   }
   assert(!gate.is_closed());
@@ -1917,8 +1957,10 @@ void ProtocolV2::do_close(
   closed_clean_fut = seastar::when_all(
       std::move(gate_closed), std::move(out_closed)
   ).discard_result().then([this] {
+    ceph_assert_always(!exit_io.has_value());
     if (has_socket) {
-      return frame_assembler.close_shutdown_socket();
+      ceph_assert_always(frame_assembler);
+      return frame_assembler->close_shutdown_socket();
     } else {
       return seastar::now();
     }
index 206af1213d5e3746984da282fc817c34ee71c59e..0b915540adccc6544866825f8a7cf9c33baa7771 100644 (file)
@@ -49,6 +49,14 @@ class ProtocolV2 final : public Protocol {
 
   void notify_out_fault(const char *, std::exception_ptr) override;
 
+  seastar::future<> wait_exit_io() {
+    if (exit_io.has_value()) {
+      return exit_io->get_shared_future();
+    } else {
+      return seastar::now();
+    }
+  }
+
  private:
   SocketMessenger &messenger;
 
@@ -57,6 +65,10 @@ class ProtocolV2 final : public Protocol {
   // the socket exists and it is not shutdown
   bool is_socket_valid = false;
 
+  FrameAssemblerV2Ref frame_assembler;
+
+  std::optional<seastar::shared_promise<>> exit_io;
+
   AuthConnectionMetaRef auth_meta;
 
   crimson::common::Gated gate;