Also move socket shutdown ownership to Protocol at READY/open.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
#include "auth/Auth.h"
+#include "crimson/common/formatter.h"
#include "crimson/common/log.h"
#include "crimson/net/Errors.h"
#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/SocketMessenger.h"
#include "msg/Message.h"
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_ms);
}
+[[noreturn]] void abort_in_fault() {
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+ throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
+{
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
+ }
+ return sum;
+}
+
} // namespace anonymous
namespace crimson::net {
(new_state != out_state_t::drop && out_state == out_state_t::drop)
));
+ bool dispatch_in = false;
if (out_state != out_state_t::open &&
new_state == out_state_t::open) {
// to open
ceph_assert_always(frame_assembler.is_socket_valid());
+ dispatch_in = true;
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_ready(conn);
+ }
+#endif
} else if (out_state == out_state_t::open &&
new_state != out_state_t::open) {
// from open
+ ceph_assert_always(frame_assembler.is_socket_valid());
+ frame_assembler.shutdown_socket();
if (out_dispatching) {
ceph_assert_always(!out_exit_dispatching.has_value());
out_exit_dispatching = seastar::shared_promise<>();
out_state_changed.set_value();
out_state_changed = seastar::shared_promise<>();
}
-}
-void Protocol::notify_keepalive_ack(utime_t keepalive_ack)
-{
- logger().trace("{} got keepalive ack {}", conn, keepalive_ack);
- next_keepalive_ack = keepalive_ack;
- notify_out_dispatch();
+ // The above needs to be atomic
+ if (dispatch_in) {
+ do_in_dispatch();
+ }
}
-void Protocol::notify_ack()
+seastar::future<> Protocol::wait_io_exit_dispatching()
{
- if (!conn.policy.lossy) {
- ++ack_left;
- notify_out_dispatch();
- }
+ ceph_assert_always(out_state != out_state_t::open);
+ ceph_assert_always(!frame_assembler.is_socket_valid());
+ return seastar::when_all(
+ [this] {
+ if (out_exit_dispatching) {
+ return out_exit_dispatching->get_shared_future();
+ } else {
+ return seastar::now();
+ }
+ }(),
+ [this] {
+ if (in_exit_dispatching) {
+ return in_exit_dispatching->get_shared_future();
+ } else {
+ return seastar::now();
+ }
+ }()
+ ).discard_result();
}
void Protocol::requeue_out_sent()
eptr = std::current_exception();
}
set_out_state(out_state_t::delay);
- notify_out_fault(eptr);
+ notify_out_fault("do_out_dispatch", eptr);
} else {
logger().info("{} do_out_dispatch(): fault at {} -- {}",
conn, out_state, e);
}
}
+seastar::future<>
+Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
+{
+ return frame_assembler.read_frame_payload(
+ ).then([this, throttle_stamp, msg_size](auto payload) {
+ if (unlikely(out_state != out_state_t::open)) {
+ logger().debug("{} triggered {} during read_message()",
+ conn, out_state);
+ abort_protocol();
+ }
+
+ utime_t recv_stamp{seastar::lowres_system_clock::now()};
+
+ // we need to get the size before std::moving segments data
+ auto msg_frame = MessageFrame::Decode(*payload);
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = msg_frame.header();
+
+ logger().trace("{} got {} + {} + {} byte message,"
+ " envelope type={} src={} off={} seq={}",
+ conn, msg_frame.front_len(), msg_frame.middle_len(),
+ msg_frame.data_len(), current_header.type, conn.get_peer_name(),
+ current_header.data_off, current_header.seq);
+
+ ceph_msg_header header{current_header.seq,
+ current_header.tid,
+ current_header.type,
+ current_header.priority,
+ current_header.version,
+ ceph_le32(msg_frame.front_len()),
+ ceph_le32(msg_frame.middle_len()),
+ ceph_le32(msg_frame.data_len()),
+ current_header.data_off,
+ conn.get_peer_name(),
+ current_header.compat_version,
+ current_header.reserved,
+ ceph_le32(0)};
+ ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+ ceph_le32(0), ceph_le64(0), current_header.flags};
+
+ auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this());
+ Message *message = decode_message(nullptr, 0, header, footer,
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+ if (!message) {
+ logger().warn("{} decode message failed", conn);
+ abort_in_fault();
+ }
+
+ // store reservation size in message, so we don't get confused
+ // by messages entering the dispatch queue through other paths.
+ message->set_dispatch_throttle_size(msg_size);
+
+ message->set_throttle_stamp(throttle_stamp);
+ message->set_recv_stamp(recv_stamp);
+ message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
+
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the
+ // client side queueing because messages can't be renumbered, but the (kernel)
+ // client will occasionally pull a message out of the sent queue to send
+ // elsewhere. in that case it doesn't matter if we "got" it or not.
+ uint64_t cur_seq = get_in_seq();
+ if (message->get_seq() <= cur_seq) {
+ logger().error("{} got old message {} <= {} {}, discarding",
+ conn, message->get_seq(), cur_seq, *message);
+ if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+ local_conf()->ms_die_on_old_message) {
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return seastar::now();
+ } else if (message->get_seq() > cur_seq + 1) {
+ logger().error("{} missed message? skipped from seq {} to {}",
+ conn, cur_seq, message->get_seq());
+ if (local_conf()->ms_die_on_skipped_message) {
+ ceph_assert(0 == "skipped incoming seq");
+ }
+ }
+
+ // note last received message.
+ in_seq = message->get_seq();
+ if (conn.policy.lossy) {
+ logger().debug("{} <== #{} === {} ({})",
+ conn,
+ message->get_seq(),
+ *message,
+ message->get_type());
+ } else {
+ logger().debug("{} <== #{},{} === {} ({})",
+ conn,
+ message->get_seq(),
+ current_header.ack_seq,
+ *message,
+ message->get_type());
+ }
+
+ // notify ack
+ if (!conn.policy.lossy) {
+ ++ack_left;
+ notify_out_dispatch();
+ }
+
+ ack_out_sent(current_header.ack_seq);
+
+ // TODO: change MessageRef with seastar::shared_ptr
+ auto msg_ref = MessageRef{message, false};
+ assert(out_state == out_state_t::open);
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ });
+}
+
+void Protocol::do_in_dispatch()
+{
+ ceph_assert_always(!in_exit_dispatching.has_value());
+ in_exit_dispatching = seastar::shared_promise<>();
+ gate.dispatch_in_background("do_in_dispatch", *this, [this] {
+ return seastar::keep_doing([this] {
+ return frame_assembler.read_main_preamble(
+ ).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::MESSAGE: {
+ size_t msg_size = get_msg_size(*ret.rx_frame_asm);
+ return seastar::futurize_invoke([this] {
+ // throttle_message() logic
+ if (!conn.policy.throttler_messages) {
+ return seastar::now();
+ }
+ // TODO: message throttler
+ ceph_assert(false);
+ return seastar::now();
+ }).then([this, msg_size] {
+ // throttle_bytes() logic
+ if (!conn.policy.throttler_bytes) {
+ return seastar::now();
+ }
+ if (!msg_size) {
+ return seastar::now();
+ }
+ logger().trace("{} wants {} bytes from policy throttler {}/{}",
+ conn, msg_size,
+ conn.policy.throttler_bytes->get_current(),
+ conn.policy.throttler_bytes->get_max());
+ return conn.policy.throttler_bytes->get(msg_size);
+ }).then([this, msg_size] {
+ // TODO: throttle_dispatch_queue() logic
+ utime_t throttle_stamp{seastar::lowres_system_clock::now()};
+ return read_message(throttle_stamp, msg_size);
+ });
+ }
+ case Tag::ACK:
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_message_ack() logic
+ auto ack = AckFrame::Decode(payload->back());
+ logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
+ ack_out_sent(ack.seq());
+ });
+ case Tag::KEEPALIVE2:
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_keepalive2() logic
+ auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
+ logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+ conn, keepalive_frame.timestamp());
+ // notify keepalive ack
+ next_keepalive_ack = keepalive_frame.timestamp();
+ notify_out_dispatch();
+
+ last_keepalive = seastar::lowres_system_clock::now();
+ });
+ case Tag::KEEPALIVE2_ACK:
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_keepalive2_ack() logic
+ auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
+ auto _last_keepalive_ack =
+ seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
+ set_last_keepalive_ack(_last_keepalive_ack);
+ logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
+ conn, _last_keepalive_ack);
+ });
+ default: {
+ logger().warn("{} do_in_dispatch() received unexpected tag: {}",
+ conn, static_cast<uint32_t>(ret.tag));
+ abort_in_fault();
+ }
+ }
+ });
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+
+ if (out_state == out_state_t::open) {
+ logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
+ conn, out_state, e_what);
+ set_out_state(out_state_t::delay);
+ notify_out_fault("do_in_dispatch", eptr);
+ } else {
+ logger().info("{} do_in_dispatch(): fault at {} -- {}",
+ conn, out_state, e_what);
+ }
+ }).finally([this] {
+ ceph_assert_always(in_exit_dispatching.has_value());
+ in_exit_dispatching->set_value();
+ in_exit_dispatching = std::nullopt;
+ });
+ });
+}
+
} // namespace crimson::net
virtual void notify_out() = 0;
- virtual void notify_out_fault(std::exception_ptr) = 0;
+ virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
// the write state-machine
public:
void set_out_state(const out_state_t &new_state);
- seastar::future<> wait_out_exit_dispatching() {
- if (out_exit_dispatching) {
- return out_exit_dispatching->get_shared_future();
- }
- return seastar::now();
- }
-
- void notify_keepalive_ack(utime_t keepalive_ack);
-
- void notify_ack();
+ seastar::future<> wait_io_exit_dispatching();
void requeue_out_sent_up_to(seq_num_t seq);
return is_out_queued() || !out_sent_msgs.empty();
}
- void ack_out_sent(seq_num_t seq);
-
- void set_last_keepalive(clock_t::time_point when) {
- last_keepalive = when;
- }
-
seq_num_t get_in_seq() const {
return in_seq;
}
- void set_in_seq(seq_num_t _in_seq) {
- in_seq = _in_seq;
- }
-
ChainedDispatchers& dispatchers;
SocketConnection &conn;
void notify_out_dispatch();
+ void ack_out_sent(seq_num_t seq);
+
+ seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
+
+ void do_in_dispatch();
+
crimson::common::Gated gate;
/*
* in states for reading
*/
+ std::optional<seastar::shared_promise<>> in_exit_dispatching;
+
/// the seq num of the last received message
seq_num_t in_seq = 0;
#include "ProtocolV2.h"
-#include <seastar/core/lowres_clock.hh>
#include <fmt/format.h>
#include <fmt/ranges.h>
#include "include/msgr.h"
1, std::numeric_limits<uint64_t>::max());
}
-std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
-{
- ceph_assert(rx_frame_asm.get_num_segments() > 0);
- size_t sum = 0;
- // we don't include SegmentIndex::Msg::HEADER.
- for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
- sum += rx_frame_asm.get_segment_logical_len(idx);
- }
- return sum;
-}
-
} // namespace anonymous
namespace fmt {
if (likely(has_socket)) {
if (likely(is_socket_valid)) {
+ ceph_assert_always(state != state_t::READY);
frame_assembler.shutdown_socket();
is_socket_valid = false;
} else {
- ceph_assert_always(state == state_t::CONNECTING ||
- state == state_t::REPLACING);
+ ceph_assert_always(state != state_t::ESTABLISHING);
}
} else { // !has_socket
ceph_assert_always(state == state_t::CONNECTING);
assert(server_cookie == 0);
logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
}
- return seastar::when_all(
- wait_out_exit_dispatching(),
- wait_in_exit_dispatching()
- ).discard_result().then([this] {
+ return wait_io_exit_dispatching().then([this] {
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} before Socket::connect()",
conn, get_state_name(state));
dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
// state may become CLOSING, close mover.socket and abort later
- return seastar::when_all(
- wait_out_exit_dispatching(),
- wait_in_exit_dispatching()
- ).discard_result().then([this] {
+ return wait_io_exit_dispatching(
+ ).then([this] {
protocol_timer.cancel();
auto done = std::move(execution_done);
execution_done = seastar::now();
// READY state
-void ProtocolV2::notify_out_fault(std::exception_ptr eptr)
-{
- fault(state_t::READY, "notify_out_fault", eptr);
-}
-
-seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size)
+void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
{
- return frame_assembler.read_frame_payload(
- ).then([this, throttle_stamp, msg_size](auto payload) {
- if (unlikely(state != state_t::READY)) {
- logger().debug("{} triggered {} during read_message()",
- conn, get_state_name(state));
- abort_protocol();
- }
-
- utime_t recv_stamp{seastar::lowres_system_clock::now()};
-
- // we need to get the size before std::moving segments data
- auto msg_frame = MessageFrame::Decode(*payload);
- // XXX: paranoid copy just to avoid oops
- ceph_msg_header2 current_header = msg_frame.header();
-
- logger().trace("{} got {} + {} + {} byte message,"
- " envelope type={} src={} off={} seq={}",
- conn, msg_frame.front_len(), msg_frame.middle_len(),
- msg_frame.data_len(), current_header.type, conn.get_peer_name(),
- current_header.data_off, current_header.seq);
-
- ceph_msg_header header{current_header.seq,
- current_header.tid,
- current_header.type,
- current_header.priority,
- current_header.version,
- ceph_le32(msg_frame.front_len()),
- ceph_le32(msg_frame.middle_len()),
- ceph_le32(msg_frame.data_len()),
- current_header.data_off,
- conn.get_peer_name(),
- current_header.compat_version,
- current_header.reserved,
- ceph_le32(0)};
- ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
- ceph_le32(0), ceph_le64(0), current_header.flags};
-
- auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this());
- Message *message = decode_message(nullptr, 0, header, footer,
- msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
- if (!message) {
- logger().warn("{} decode message failed", conn);
- abort_in_fault();
- }
-
- // store reservation size in message, so we don't get confused
- // by messages entering the dispatch queue through other paths.
- message->set_dispatch_throttle_size(msg_size);
-
- message->set_throttle_stamp(throttle_stamp);
- message->set_recv_stamp(recv_stamp);
- message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
-
- // check received seq#. if it is old, drop the message.
- // note that incoming messages may skip ahead. this is convenient for the
- // client side queueing because messages can't be renumbered, but the (kernel)
- // client will occasionally pull a message out of the sent queue to send
- // elsewhere. in that case it doesn't matter if we "got" it or not.
- uint64_t cur_seq = get_in_seq();
- if (message->get_seq() <= cur_seq) {
- logger().error("{} got old message {} <= {} {}, discarding",
- conn, message->get_seq(), cur_seq, *message);
- if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
- local_conf()->ms_die_on_old_message) {
- ceph_assert(0 == "old msgs despite reconnect_seq feature");
- }
- return seastar::now();
- } else if (message->get_seq() > cur_seq + 1) {
- logger().error("{} missed message? skipped from seq {} to {}",
- conn, cur_seq, message->get_seq());
- if (local_conf()->ms_die_on_skipped_message) {
- ceph_assert(0 == "skipped incoming seq");
- }
- }
-
- // note last received message.
- set_in_seq(message->get_seq());
- if (conn.policy.lossy) {
- logger().debug("{} <== #{} === {} ({})",
- conn,
- message->get_seq(),
- *message,
- message->get_type());
- } else {
- logger().debug("{} <== #{},{} === {} ({})",
- conn,
- message->get_seq(),
- current_header.ack_seq,
- *message,
- message->get_type());
- }
- notify_ack();
- ack_out_sent(current_header.ack_seq);
-
- // TODO: change MessageRef with seastar::shared_ptr
- auto msg_ref = MessageRef{message, false};
- assert(state == state_t::READY);
- // throttle the reading process by the returned future
- return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
- });
+ fault(state_t::READY, where, eptr);
}
void ProtocolV2::execute_ready()
{
- ceph_assert_always(is_socket_valid);
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
+ protocol_timer.cancel();
+ ceph_assert_always(is_socket_valid);
+ // I'm not responsible to shutdown the socket at READY
+ is_socket_valid = false;
trigger_state(state_t::READY, out_state_t::open, false);
-#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- conn.interceptor->register_conn_ready(conn);
- }
-#endif
- ceph_assert_always(!in_exit_dispatching.has_value());
- in_exit_dispatching = seastar::shared_promise<>();
- gate.dispatch_in_background("execute_ready", *this, [this] {
- protocol_timer.cancel();
- return seastar::keep_doing([this] {
- return frame_assembler.read_main_preamble(
- ).then([this](auto ret) {
- switch (ret.tag) {
- case Tag::MESSAGE: {
- size_t msg_size = get_msg_size(*ret.rx_frame_asm);
- return seastar::futurize_invoke([this] {
- // throttle_message() logic
- if (!conn.policy.throttler_messages) {
- return seastar::now();
- }
- // TODO: message throttler
- ceph_assert(false);
- return seastar::now();
- }).then([this, msg_size] {
- // throttle_bytes() logic
- if (!conn.policy.throttler_bytes) {
- return seastar::now();
- }
- if (!msg_size) {
- return seastar::now();
- }
- logger().trace("{} wants {} bytes from policy throttler {}/{}",
- conn, msg_size,
- conn.policy.throttler_bytes->get_current(),
- conn.policy.throttler_bytes->get_max());
- return conn.policy.throttler_bytes->get(msg_size);
- }).then([this, msg_size] {
- // TODO: throttle_dispatch_queue() logic
- utime_t throttle_stamp{seastar::lowres_system_clock::now()};
- return read_message(throttle_stamp, msg_size);
- });
- }
- case Tag::ACK:
- return frame_assembler.read_frame_payload(
- ).then([this](auto payload) {
- // handle_message_ack() logic
- auto ack = AckFrame::Decode(payload->back());
- logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
- ack_out_sent(ack.seq());
- });
- case Tag::KEEPALIVE2:
- return frame_assembler.read_frame_payload(
- ).then([this](auto payload) {
- // handle_keepalive2() logic
- auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
- logger().debug("{} GOT KeepAliveFrame: timestamp={}",
- conn, keepalive_frame.timestamp());
- notify_keepalive_ack(keepalive_frame.timestamp());
- set_last_keepalive(seastar::lowres_system_clock::now());
- });
- case Tag::KEEPALIVE2_ACK:
- return frame_assembler.read_frame_payload(
- ).then([this](auto payload) {
- // handle_keepalive2_ack() logic
- auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
- auto _last_keepalive_ack =
- seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
- set_last_keepalive_ack(_last_keepalive_ack);
- logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
- conn, _last_keepalive_ack);
- });
- default: {
- unexpected_tag(ret.tag, conn, "execute_ready");
- return seastar::now();
- }
- }
- });
- }).handle_exception([this](std::exception_ptr eptr) {
- fault(state_t::READY, "execute_ready", eptr);
- }).finally([this] {
- ceph_assert_always(in_exit_dispatching.has_value());
- in_exit_dispatching->set_value();
- in_exit_dispatching = std::nullopt;
- });
- });
}
// STANDBY state
private:
void notify_out() override;
- void notify_out_fault(std::exception_ptr) override;
+ void notify_out_fault(const char *, std::exception_ptr) override;
private:
SocketMessenger &messenger;
uint64_t peer_global_seq = 0;
uint64_t connect_seq = 0;
- std::optional<seastar::shared_promise<>> in_exit_dispatching;
- seastar::future<> wait_in_exit_dispatching() {
- if (in_exit_dispatching.has_value()) {
- return in_exit_dispatching->get_shared_future();
- }
- return seastar::now();
- }
-
seastar::future<> execution_done = seastar::now();
template <typename Func>
uint64_t new_msg_seq);
// READY
- seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
void execute_ready();
// STANDBY