From: Yingxin Cheng Date: Thu, 1 Dec 2022 02:36:35 +0000 (+0800) Subject: crimson/net: move message read path from ProtocolV2 to Protocol X-Git-Tag: v18.1.0~375^2~17 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=e1393ad102063b6b74fbee53506f9de4b6901670;p=ceph-ci.git crimson/net: move message read path from ProtocolV2 to Protocol Also move socket shutdown ownership to Protocol at READY/open. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 3ec265d4b23..38b0217631f 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -5,6 +5,7 @@ #include "auth/Auth.h" +#include "crimson/common/formatter.h" #include "crimson/common/log.h" #include "crimson/net/Errors.h" #include "crimson/net/chained_dispatchers.h" @@ -12,12 +13,34 @@ #include "crimson/net/SocketMessenger.h" #include "msg/Message.h" +using namespace ceph::msgr::v2; +using crimson::common::local_conf; + namespace { seastar::logger& logger() { return crimson::get_logger(ceph_subsys_ms); } +[[noreturn]] void abort_in_fault() { + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); +} + +[[noreturn]] void abort_protocol() { + throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); +} + +std::size_t get_msg_size(const FrameAssembler &rx_frame_asm) +{ + ceph_assert(rx_frame_asm.get_num_segments() > 0); + size_t sum = 0; + // we don't include SegmentIndex::Msg::HEADER. + for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { + sum += rx_frame_asm.get_segment_logical_len(idx); + } + return sum; +} + } // namespace anonymous namespace crimson::net { @@ -125,13 +148,22 @@ void Protocol::set_out_state( (new_state != out_state_t::drop && out_state == out_state_t::drop) )); + bool dispatch_in = false; if (out_state != out_state_t::open && new_state == out_state_t::open) { // to open ceph_assert_always(frame_assembler.is_socket_valid()); + dispatch_in = true; +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + conn.interceptor->register_conn_ready(conn); + } +#endif } else if (out_state == out_state_t::open && new_state != out_state_t::open) { // from open + ceph_assert_always(frame_assembler.is_socket_valid()); + frame_assembler.shutdown_socket(); if (out_dispatching) { ceph_assert_always(!out_exit_dispatching.has_value()); out_exit_dispatching = seastar::shared_promise<>(); @@ -143,21 +175,33 @@ void Protocol::set_out_state( out_state_changed.set_value(); out_state_changed = seastar::shared_promise<>(); } -} -void Protocol::notify_keepalive_ack(utime_t keepalive_ack) -{ - logger().trace("{} got keepalive ack {}", conn, keepalive_ack); - next_keepalive_ack = keepalive_ack; - notify_out_dispatch(); + // The above needs to be atomic + if (dispatch_in) { + do_in_dispatch(); + } } -void Protocol::notify_ack() +seastar::future<> Protocol::wait_io_exit_dispatching() { - if (!conn.policy.lossy) { - ++ack_left; - notify_out_dispatch(); - } + ceph_assert_always(out_state != out_state_t::open); + ceph_assert_always(!frame_assembler.is_socket_valid()); + return seastar::when_all( + [this] { + if (out_exit_dispatching) { + return out_exit_dispatching->get_shared_future(); + } else { + return seastar::now(); + } + }(), + [this] { + if (in_exit_dispatching) { + return in_exit_dispatching->get_shared_future(); + } else { + return seastar::now(); + } + }() + ).discard_result(); } void Protocol::requeue_out_sent() @@ -326,7 +370,7 @@ seastar::future<> Protocol::do_out_dispatch() eptr = std::current_exception(); } set_out_state(out_state_t::delay); - notify_out_fault(eptr); + notify_out_fault("do_out_dispatch", eptr); } else { logger().info("{} do_out_dispatch(): fault at {} -- {}", conn, out_state, e); @@ -361,4 +405,218 @@ void Protocol::notify_out_dispatch() } } +seastar::future<> +Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size) +{ + return frame_assembler.read_frame_payload( + ).then([this, throttle_stamp, msg_size](auto payload) { + if (unlikely(out_state != out_state_t::open)) { + logger().debug("{} triggered {} during read_message()", + conn, out_state); + abort_protocol(); + } + + utime_t recv_stamp{seastar::lowres_system_clock::now()}; + + // we need to get the size before std::moving segments data + auto msg_frame = MessageFrame::Decode(*payload); + // XXX: paranoid copy just to avoid oops + ceph_msg_header2 current_header = msg_frame.header(); + + logger().trace("{} got {} + {} + {} byte message," + " envelope type={} src={} off={} seq={}", + conn, msg_frame.front_len(), msg_frame.middle_len(), + msg_frame.data_len(), current_header.type, conn.get_peer_name(), + current_header.data_off, current_header.seq); + + ceph_msg_header header{current_header.seq, + current_header.tid, + current_header.type, + current_header.priority, + current_header.version, + ceph_le32(msg_frame.front_len()), + ceph_le32(msg_frame.middle_len()), + ceph_le32(msg_frame.data_len()), + current_header.data_off, + conn.get_peer_name(), + current_header.compat_version, + current_header.reserved, + ceph_le32(0)}; + ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), + ceph_le32(0), ceph_le64(0), current_header.flags}; + + auto conn_ref = seastar::static_pointer_cast( + conn.shared_from_this()); + Message *message = decode_message(nullptr, 0, header, footer, + msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref); + if (!message) { + logger().warn("{} decode message failed", conn); + abort_in_fault(); + } + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(msg_size); + + message->set_throttle_stamp(throttle_stamp); + message->set_recv_stamp(recv_stamp); + message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()}); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the + // client side queueing because messages can't be renumbered, but the (kernel) + // client will occasionally pull a message out of the sent queue to send + // elsewhere. in that case it doesn't matter if we "got" it or not. + uint64_t cur_seq = get_in_seq(); + if (message->get_seq() <= cur_seq) { + logger().error("{} got old message {} <= {} {}, discarding", + conn, message->get_seq(), cur_seq, *message); + if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && + local_conf()->ms_die_on_old_message) { + ceph_assert(0 == "old msgs despite reconnect_seq feature"); + } + return seastar::now(); + } else if (message->get_seq() > cur_seq + 1) { + logger().error("{} missed message? skipped from seq {} to {}", + conn, cur_seq, message->get_seq()); + if (local_conf()->ms_die_on_skipped_message) { + ceph_assert(0 == "skipped incoming seq"); + } + } + + // note last received message. + in_seq = message->get_seq(); + if (conn.policy.lossy) { + logger().debug("{} <== #{} === {} ({})", + conn, + message->get_seq(), + *message, + message->get_type()); + } else { + logger().debug("{} <== #{},{} === {} ({})", + conn, + message->get_seq(), + current_header.ack_seq, + *message, + message->get_type()); + } + + // notify ack + if (!conn.policy.lossy) { + ++ack_left; + notify_out_dispatch(); + } + + ack_out_sent(current_header.ack_seq); + + // TODO: change MessageRef with seastar::shared_ptr + auto msg_ref = MessageRef{message, false}; + assert(out_state == out_state_t::open); + // throttle the reading process by the returned future + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); + }); +} + +void Protocol::do_in_dispatch() +{ + ceph_assert_always(!in_exit_dispatching.has_value()); + in_exit_dispatching = seastar::shared_promise<>(); + gate.dispatch_in_background("do_in_dispatch", *this, [this] { + return seastar::keep_doing([this] { + return frame_assembler.read_main_preamble( + ).then([this](auto ret) { + switch (ret.tag) { + case Tag::MESSAGE: { + size_t msg_size = get_msg_size(*ret.rx_frame_asm); + return seastar::futurize_invoke([this] { + // throttle_message() logic + if (!conn.policy.throttler_messages) { + return seastar::now(); + } + // TODO: message throttler + ceph_assert(false); + return seastar::now(); + }).then([this, msg_size] { + // throttle_bytes() logic + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + if (!msg_size) { + return seastar::now(); + } + logger().trace("{} wants {} bytes from policy throttler {}/{}", + conn, msg_size, + conn.policy.throttler_bytes->get_current(), + conn.policy.throttler_bytes->get_max()); + return conn.policy.throttler_bytes->get(msg_size); + }).then([this, msg_size] { + // TODO: throttle_dispatch_queue() logic + utime_t throttle_stamp{seastar::lowres_system_clock::now()}; + return read_message(throttle_stamp, msg_size); + }); + } + case Tag::ACK: + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { + // handle_message_ack() logic + auto ack = AckFrame::Decode(payload->back()); + logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); + ack_out_sent(ack.seq()); + }); + case Tag::KEEPALIVE2: + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { + // handle_keepalive2() logic + auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); + logger().debug("{} GOT KeepAliveFrame: timestamp={}", + conn, keepalive_frame.timestamp()); + // notify keepalive ack + next_keepalive_ack = keepalive_frame.timestamp(); + notify_out_dispatch(); + + last_keepalive = seastar::lowres_system_clock::now(); + }); + case Tag::KEEPALIVE2_ACK: + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { + // handle_keepalive2_ack() logic + auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back()); + auto _last_keepalive_ack = + seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}; + set_last_keepalive_ack(_last_keepalive_ack); + logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", + conn, _last_keepalive_ack); + }); + default: { + logger().warn("{} do_in_dispatch() received unexpected tag: {}", + conn, static_cast(ret.tag)); + abort_in_fault(); + } + } + }); + }).handle_exception([this](std::exception_ptr eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + + if (out_state == out_state_t::open) { + logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}", + conn, out_state, e_what); + set_out_state(out_state_t::delay); + notify_out_fault("do_in_dispatch", eptr); + } else { + logger().info("{} do_in_dispatch(): fault at {} -- {}", + conn, out_state, e_what); + } + }).finally([this] { + ceph_assert_always(in_exit_dispatching.has_value()); + in_exit_dispatching->set_value(); + in_exit_dispatching = std::nullopt; + }); + }); +} + } // namespace crimson::net diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 4c82d9847db..5957956900b 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -47,7 +47,7 @@ class Protocol { virtual void notify_out() = 0; - virtual void notify_out_fault(std::exception_ptr) = 0; + virtual void notify_out_fault(const char *where, std::exception_ptr) = 0; // the write state-machine public: @@ -107,16 +107,7 @@ class Protocol { void set_out_state(const out_state_t &new_state); - seastar::future<> wait_out_exit_dispatching() { - if (out_exit_dispatching) { - return out_exit_dispatching->get_shared_future(); - } - return seastar::now(); - } - - void notify_keepalive_ack(utime_t keepalive_ack); - - void notify_ack(); + seastar::future<> wait_io_exit_dispatching(); void requeue_out_sent_up_to(seq_num_t seq); @@ -132,20 +123,10 @@ class Protocol { return is_out_queued() || !out_sent_msgs.empty(); } - void ack_out_sent(seq_num_t seq); - - void set_last_keepalive(clock_t::time_point when) { - last_keepalive = when; - } - seq_num_t get_in_seq() const { return in_seq; } - void set_in_seq(seq_num_t _in_seq) { - in_seq = _in_seq; - } - ChainedDispatchers& dispatchers; SocketConnection &conn; @@ -171,6 +152,12 @@ class Protocol { void notify_out_dispatch(); + void ack_out_sent(seq_num_t seq); + + seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size); + + void do_in_dispatch(); + crimson::common::Gated gate; /* @@ -208,6 +195,8 @@ class Protocol { * in states for reading */ + std::optional> in_exit_dispatching; + /// the seq num of the last received message seq_num_t in_seq = 0; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 8bce2ee6a84..2337ecfbc38 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -3,7 +3,6 @@ #include "ProtocolV2.h" -#include #include #include #include "include/msgr.h" @@ -99,17 +98,6 @@ inline uint64_t generate_client_cookie() { 1, std::numeric_limits::max()); } -std::size_t get_msg_size(const FrameAssembler &rx_frame_asm) -{ - ceph_assert(rx_frame_asm.get_num_segments() > 0); - size_t sum = 0; - // we don't include SegmentIndex::Msg::HEADER. - for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { - sum += rx_frame_asm.get_segment_logical_len(idx); - } - return sum; -} - } // namespace anonymous namespace fmt { @@ -287,11 +275,11 @@ void ProtocolV2::fault( if (likely(has_socket)) { if (likely(is_socket_valid)) { + ceph_assert_always(state != state_t::READY); frame_assembler.shutdown_socket(); is_socket_valid = false; } else { - ceph_assert_always(state == state_t::CONNECTING || - state == state_t::REPLACING); + ceph_assert_always(state != state_t::ESTABLISHING); } } else { // !has_socket ceph_assert_always(state == state_t::CONNECTING); @@ -783,10 +771,7 @@ void ProtocolV2::execute_connecting() assert(server_cookie == 0); logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); } - return seastar::when_all( - wait_out_exit_dispatching(), - wait_in_exit_dispatching() - ).discard_result().then([this] { + return wait_io_exit_dispatching().then([this] { if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} before Socket::connect()", conn, get_state_name(state)); @@ -1687,10 +1672,8 @@ void ProtocolV2::trigger_replacing(bool reconnect, dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); // state may become CLOSING, close mover.socket and abort later - return seastar::when_all( - wait_out_exit_dispatching(), - wait_in_exit_dispatching() - ).discard_result().then([this] { + return wait_io_exit_dispatching( + ).then([this] { protocol_timer.cancel(); auto done = std::move(execution_done); execution_done = seastar::now(); @@ -1766,206 +1749,19 @@ void ProtocolV2::trigger_replacing(bool reconnect, // READY state -void ProtocolV2::notify_out_fault(std::exception_ptr eptr) -{ - fault(state_t::READY, "notify_out_fault", eptr); -} - -seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size) +void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr) { - return frame_assembler.read_frame_payload( - ).then([this, throttle_stamp, msg_size](auto payload) { - if (unlikely(state != state_t::READY)) { - logger().debug("{} triggered {} during read_message()", - conn, get_state_name(state)); - abort_protocol(); - } - - utime_t recv_stamp{seastar::lowres_system_clock::now()}; - - // we need to get the size before std::moving segments data - auto msg_frame = MessageFrame::Decode(*payload); - // XXX: paranoid copy just to avoid oops - ceph_msg_header2 current_header = msg_frame.header(); - - logger().trace("{} got {} + {} + {} byte message," - " envelope type={} src={} off={} seq={}", - conn, msg_frame.front_len(), msg_frame.middle_len(), - msg_frame.data_len(), current_header.type, conn.get_peer_name(), - current_header.data_off, current_header.seq); - - ceph_msg_header header{current_header.seq, - current_header.tid, - current_header.type, - current_header.priority, - current_header.version, - ceph_le32(msg_frame.front_len()), - ceph_le32(msg_frame.middle_len()), - ceph_le32(msg_frame.data_len()), - current_header.data_off, - conn.get_peer_name(), - current_header.compat_version, - current_header.reserved, - ceph_le32(0)}; - ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), - ceph_le32(0), ceph_le64(0), current_header.flags}; - - auto conn_ref = seastar::static_pointer_cast( - conn.shared_from_this()); - Message *message = decode_message(nullptr, 0, header, footer, - msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref); - if (!message) { - logger().warn("{} decode message failed", conn); - abort_in_fault(); - } - - // store reservation size in message, so we don't get confused - // by messages entering the dispatch queue through other paths. - message->set_dispatch_throttle_size(msg_size); - - message->set_throttle_stamp(throttle_stamp); - message->set_recv_stamp(recv_stamp); - message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()}); - - // check received seq#. if it is old, drop the message. - // note that incoming messages may skip ahead. this is convenient for the - // client side queueing because messages can't be renumbered, but the (kernel) - // client will occasionally pull a message out of the sent queue to send - // elsewhere. in that case it doesn't matter if we "got" it or not. - uint64_t cur_seq = get_in_seq(); - if (message->get_seq() <= cur_seq) { - logger().error("{} got old message {} <= {} {}, discarding", - conn, message->get_seq(), cur_seq, *message); - if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && - local_conf()->ms_die_on_old_message) { - ceph_assert(0 == "old msgs despite reconnect_seq feature"); - } - return seastar::now(); - } else if (message->get_seq() > cur_seq + 1) { - logger().error("{} missed message? skipped from seq {} to {}", - conn, cur_seq, message->get_seq()); - if (local_conf()->ms_die_on_skipped_message) { - ceph_assert(0 == "skipped incoming seq"); - } - } - - // note last received message. - set_in_seq(message->get_seq()); - if (conn.policy.lossy) { - logger().debug("{} <== #{} === {} ({})", - conn, - message->get_seq(), - *message, - message->get_type()); - } else { - logger().debug("{} <== #{},{} === {} ({})", - conn, - message->get_seq(), - current_header.ack_seq, - *message, - message->get_type()); - } - notify_ack(); - ack_out_sent(current_header.ack_seq); - - // TODO: change MessageRef with seastar::shared_ptr - auto msg_ref = MessageRef{message, false}; - assert(state == state_t::READY); - // throttle the reading process by the returned future - return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); - }); + fault(state_t::READY, where, eptr); } void ProtocolV2::execute_ready() { - ceph_assert_always(is_socket_valid); assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); + protocol_timer.cancel(); + ceph_assert_always(is_socket_valid); + // I'm not responsible to shutdown the socket at READY + is_socket_valid = false; trigger_state(state_t::READY, out_state_t::open, false); -#ifdef UNIT_TESTS_BUILT - if (conn.interceptor) { - conn.interceptor->register_conn_ready(conn); - } -#endif - ceph_assert_always(!in_exit_dispatching.has_value()); - in_exit_dispatching = seastar::shared_promise<>(); - gate.dispatch_in_background("execute_ready", *this, [this] { - protocol_timer.cancel(); - return seastar::keep_doing([this] { - return frame_assembler.read_main_preamble( - ).then([this](auto ret) { - switch (ret.tag) { - case Tag::MESSAGE: { - size_t msg_size = get_msg_size(*ret.rx_frame_asm); - return seastar::futurize_invoke([this] { - // throttle_message() logic - if (!conn.policy.throttler_messages) { - return seastar::now(); - } - // TODO: message throttler - ceph_assert(false); - return seastar::now(); - }).then([this, msg_size] { - // throttle_bytes() logic - if (!conn.policy.throttler_bytes) { - return seastar::now(); - } - if (!msg_size) { - return seastar::now(); - } - logger().trace("{} wants {} bytes from policy throttler {}/{}", - conn, msg_size, - conn.policy.throttler_bytes->get_current(), - conn.policy.throttler_bytes->get_max()); - return conn.policy.throttler_bytes->get(msg_size); - }).then([this, msg_size] { - // TODO: throttle_dispatch_queue() logic - utime_t throttle_stamp{seastar::lowres_system_clock::now()}; - return read_message(throttle_stamp, msg_size); - }); - } - case Tag::ACK: - return frame_assembler.read_frame_payload( - ).then([this](auto payload) { - // handle_message_ack() logic - auto ack = AckFrame::Decode(payload->back()); - logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); - ack_out_sent(ack.seq()); - }); - case Tag::KEEPALIVE2: - return frame_assembler.read_frame_payload( - ).then([this](auto payload) { - // handle_keepalive2() logic - auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); - logger().debug("{} GOT KeepAliveFrame: timestamp={}", - conn, keepalive_frame.timestamp()); - notify_keepalive_ack(keepalive_frame.timestamp()); - set_last_keepalive(seastar::lowres_system_clock::now()); - }); - case Tag::KEEPALIVE2_ACK: - return frame_assembler.read_frame_payload( - ).then([this](auto payload) { - // handle_keepalive2_ack() logic - auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back()); - auto _last_keepalive_ack = - seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}; - set_last_keepalive_ack(_last_keepalive_ack); - logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", - conn, _last_keepalive_ack); - }); - default: { - unexpected_tag(ret.tag, conn, "execute_ready"); - return seastar::now(); - } - } - }); - }).handle_exception([this](std::exception_ptr eptr) { - fault(state_t::READY, "execute_ready", eptr); - }).finally([this] { - ceph_assert_always(in_exit_dispatching.has_value()); - in_exit_dispatching->set_value(); - in_exit_dispatching = std::nullopt; - }); - }); } // STANDBY state diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 5c7d369bb2a..206af1213d5 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -47,7 +47,7 @@ class ProtocolV2 final : public Protocol { private: void notify_out() override; - void notify_out_fault(std::exception_ptr) override; + void notify_out_fault(const char *, std::exception_ptr) override; private: SocketMessenger &messenger; @@ -108,14 +108,6 @@ class ProtocolV2 final : public Protocol { uint64_t peer_global_seq = 0; uint64_t connect_seq = 0; - std::optional> in_exit_dispatching; - seastar::future<> wait_in_exit_dispatching() { - if (in_exit_dispatching.has_value()) { - return in_exit_dispatching->get_shared_future(); - } - return seastar::now(); - } - seastar::future<> execution_done = seastar::now(); template @@ -235,7 +227,6 @@ class ProtocolV2 final : public Protocol { uint64_t new_msg_seq); // READY - seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size); void execute_ready(); // STANDBY