From: Yingxin Cheng Date: Tue, 19 Mar 2019 14:13:33 +0000 (+0800) Subject: crimson/net: message frame exchanges (lossy part) X-Git-Tag: v15.1.0~3027^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=153e4877ae8b0882d6f6c3cd8142802db2ce7a2f;p=ceph.git crimson/net: message frame exchanges (lossy part) Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 7a12511b1fd..115f0618006 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -9,6 +9,7 @@ #include "crimson/auth/AuthClient.h" #include "crimson/auth/AuthServer.h" +#include "Config.h" #include "Dispatcher.h" #include "Errors.h" #include "Socket.h" @@ -1336,34 +1337,223 @@ seastar::future<> ProtocolV2::send_reconnect_ok() seastar::future<> ProtocolV2::write_message(MessageRef msg) { - // TODO not implemented - // - ceph_assert(false); + // TODO: move to common code + // set priority + msg->get_header().src = messenger.get_myname(); + + msg->encode(conn.features, 0); + + msg->set_seq(++conn.out_seq); + uint64_t ack_seq = conn.in_seq; + // ack_left = 0; + + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + ceph_msg_header2 header2{header.seq, header.tid, + header.type, header.priority, + header.version, + 0, header.data_off, + ack_seq, + footer.flags, header.compat_version, + header.reserved}; + + auto message = MessageFrame::Encode(header2, + msg->get_payload(), msg->get_middle(), msg->get_data()); + logger().debug("{} write msg type={} off={} seq={}", + conn, header2.type, header2.data_off, header2.seq); + return write_frame(message, false); } seastar::future<> ProtocolV2::do_keepalive() { - // TODO not implemented - // - ceph_assert(false); + auto keepalive_frame = KeepAliveFrame::Encode(); + return write_frame(keepalive_frame, false); } seastar::future<> ProtocolV2::do_keepalive_ack() { - // TODO not implemented - // - ceph_assert(false); + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send); + return write_frame(keepalive_ack_frame, false); +} + +void ProtocolV2::handle_message_ack(seq_num_t seq) { + if (conn.policy.lossy) { // lossy connections don't keep sent messages + return; + } + + // TODO: lossless policy +} + +seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) +{ + return read_frame_payload() + .then([this, throttle_stamp] { + utime_t recv_stamp = ceph_clock_now(); + + // we need to get the size before std::moving segments data + const size_t cur_msg_size = get_current_msg_size(); + auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data)); + // XXX: paranoid copy just to avoid oops + ceph_msg_header2 current_header = msg_frame.header(); + + logger().debug("{} got {} + {} + {} byte message," + " envelope type={} src={} off={} seq={}", + conn, msg_frame.front_len(), msg_frame.middle_len(), + msg_frame.data_len(), current_header.type, peer_name, + current_header.data_off, current_header.seq); + + ceph_msg_header header{current_header.seq, + current_header.tid, + current_header.type, + current_header.priority, + current_header.version, + msg_frame.front_len(), + msg_frame.middle_len(), + msg_frame.data_len(), + current_header.data_off, + peer_name, + current_header.compat_version, + current_header.reserved, + 0}; + ceph_msg_footer footer{0, 0, 0, 0, current_header.flags}; + + Message *message = decode_message(nullptr, 0, header, footer, + msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr); + if (!message) { + logger().warn("{} decode message failed", conn); + abort_in_fault(); + } + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(cur_msg_size); + + message->set_throttle_stamp(throttle_stamp); + message->set_recv_stamp(recv_stamp); + message->set_recv_complete_stamp(ceph_clock_now()); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the + // client side queueing because messages can't be renumbered, but the (kernel) + // client will occasionally pull a message out of the sent queue to send + // elsewhere. in that case it doesn't matter if we "got" it or not. + uint64_t cur_seq = conn.in_seq; + if (message->get_seq() <= cur_seq) { + logger().error("{} got old message {} <= {} {} {}, discarding", + conn, message->get_seq(), cur_seq, message, *message); + if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && + conf.ms_die_on_old_message) { + ceph_assert(0 == "old msgs despite reconnect_seq feature"); + } + return; + } else if (message->get_seq() > cur_seq + 1) { + logger().error("{} missed message? skipped from seq {} to {}", + conn, cur_seq, message->get_seq()); + if (conf.ms_die_on_skipped_message) { + ceph_assert(0 == "skipped incoming seq"); + } + } + + // note last received message. + conn.in_seq = message->get_seq(); + logger().debug("{} received message m={} seq={} from={} type={} {}", + conn, message, message->get_seq(), message->get_source(), + header.type, *message); + + if (!conn.policy.lossy) { + // ++ack_left; + } + handle_message_ack(current_header.ack_seq); + + // TODO: change MessageRef with seastar::shared_ptr + auto msg_ref = MessageRef{message, false}; + seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { + return dispatcher.ms_dispatch( + seastar::static_pointer_cast( + conn.shared_from_this()), + std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_dispatch caught exception: {}", conn, eptr); + ceph_assert(false); + }); + }); + }); } void ProtocolV2::execute_ready() { - // - // trigger_state(state_t::READY, write_state_t::open, false); - // TODO: schedule reading messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame - state = state_t::READY; - logger().info("{} reaches READY state successfully.", conn); - close(); + trigger_state(state_t::READY, write_state_t::open, false); + seastar::with_gate(pending_dispatch, [this] { + return seastar::keep_doing([this] { + return read_main_preamble() + .then([this] (Tag tag) { + switch (tag) { + case Tag::MESSAGE: { + return seastar::futurize_apply([this] { + // throttle_message() logic + if (!conn.policy.throttler_messages) { + return seastar::now(); + } + // TODO: message throttler + ceph_assert(false); + return seastar::now(); + }).then([this] { + // throttle_bytes() logic + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + size_t cur_msg_size = get_current_msg_size(); + if (!cur_msg_size) { + return seastar::now(); + } + logger().debug("{} wants {} bytes from policy throttler {}/{}", + conn, cur_msg_size, + conn.policy.throttler_bytes->get_current(), + conn.policy.throttler_bytes->get_max()); + return conn.policy.throttler_bytes->get(cur_msg_size); + }).then([this] { + // TODO: throttle_dispatch_queue() logic + utime_t throttle_stamp = ceph_clock_now(); + return read_message(throttle_stamp); + }); + } + case Tag::ACK: + return read_frame_payload() + .then([this] { + // handle_message_ack() logic + auto ack = AckFrame::Decode(rx_segments_data.back()); + handle_message_ack(ack.seq()); + }); + case Tag::KEEPALIVE2: + return read_frame_payload() + .then([this] { + // handle_keepalive2() logic + auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back()); + last_keepalive_ack_to_send = keepalive_frame.timestamp(); + logger().debug("{} got KEEPALIVE2 {}", + conn, last_keepalive_ack_to_send); + conn.last_keepalive = ceph_clock_now(); + notify_keepalive_ack(); + }); + case Tag::KEEPALIVE2_ACK: + return read_frame_payload() + .then([this] { + // handle_keepalive2_ack() logic + auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back()); + conn.last_keepalive_ack = keepalive_ack_frame.timestamp(); + logger().debug("{} got KEEPALIVE_ACK {}", + conn, conn.last_keepalive_ack); + }); + default: + return unexpected_tag(tag, conn, "execute_ready"); + } + }); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in READY state + return fault(); + }); + }); } // STANDBY state diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 08f935470fd..e176dc343fa 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -72,6 +72,8 @@ class ProtocolV2 final : public Protocol { uint64_t peer_global_seq = 0; uint64_t connect_seq = 0; + utime_t last_keepalive_ack_to_send; + // TODO: Frame related implementations, probably to a separate class. private: bool record_io = false; @@ -143,6 +145,8 @@ class ProtocolV2 final : public Protocol { seastar::future<> send_reconnect_ok(); // READY + seastar::future<> read_message(utime_t throttle_stamp); + void handle_message_ack(seq_num_t seq); void execute_ready(); // STANDBY