#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
+#include "Config.h"
#include "Dispatcher.h"
#include "Errors.h"
#include "Socket.h"
seastar::future<> ProtocolV2::write_message(MessageRef msg)
{
- // TODO not implemented
- // <scheduled by parent, to send out the message on the wire>
- ceph_assert(false);
+ // TODO: move to common code
+ // set priority
+ msg->get_header().src = messenger.get_myname();
+
+ msg->encode(conn.features, 0);
+
+ msg->set_seq(++conn.out_seq);
+ uint64_t ack_seq = conn.in_seq;
+ // ack_left = 0;
+
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ 0, header.data_off,
+ ack_seq,
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto message = MessageFrame::Encode(header2,
+ msg->get_payload(), msg->get_middle(), msg->get_data());
+ logger().debug("{} write msg type={} off={} seq={}",
+ conn, header2.type, header2.data_off, header2.seq);
+ return write_frame(message, false);
}
seastar::future<> ProtocolV2::do_keepalive()
{
- // TODO not implemented
- // <scheduled by parent, to send out KeepAliveFrame on the wire>
- ceph_assert(false);
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ return write_frame(keepalive_frame, false);
}
seastar::future<> ProtocolV2::do_keepalive_ack()
{
- // TODO not implemented
- // <scheduled by parent, to send out KeepAliveAckFrame on the wire>
- ceph_assert(false);
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
+ return write_frame(keepalive_ack_frame, false);
+}
+
+void ProtocolV2::handle_message_ack(seq_num_t seq) {
+ if (conn.policy.lossy) { // lossy connections don't keep sent messages
+ return;
+ }
+
+ // TODO: lossless policy
+}
+
+seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
+{
+ return read_frame_payload()
+ .then([this, throttle_stamp] {
+ utime_t recv_stamp = ceph_clock_now();
+
+ // we need to get the size before std::moving segments data
+ const size_t cur_msg_size = get_current_msg_size();
+ auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = msg_frame.header();
+
+ logger().debug("{} got {} + {} + {} byte message,"
+ " envelope type={} src={} off={} seq={}",
+ conn, msg_frame.front_len(), msg_frame.middle_len(),
+ msg_frame.data_len(), current_header.type, peer_name,
+ current_header.data_off, current_header.seq);
+
+ ceph_msg_header header{current_header.seq,
+ current_header.tid,
+ current_header.type,
+ current_header.priority,
+ current_header.version,
+ msg_frame.front_len(),
+ msg_frame.middle_len(),
+ msg_frame.data_len(),
+ current_header.data_off,
+ peer_name,
+ current_header.compat_version,
+ current_header.reserved,
+ 0};
+ ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
+
+ Message *message = decode_message(nullptr, 0, header, footer,
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
+ if (!message) {
+ logger().warn("{} decode message failed", conn);
+ abort_in_fault();
+ }
+
+ // store reservation size in message, so we don't get confused
+ // by messages entering the dispatch queue through other paths.
+ message->set_dispatch_throttle_size(cur_msg_size);
+
+ message->set_throttle_stamp(throttle_stamp);
+ message->set_recv_stamp(recv_stamp);
+ message->set_recv_complete_stamp(ceph_clock_now());
+
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the
+ // client side queueing because messages can't be renumbered, but the (kernel)
+ // client will occasionally pull a message out of the sent queue to send
+ // elsewhere. in that case it doesn't matter if we "got" it or not.
+ uint64_t cur_seq = conn.in_seq;
+ if (message->get_seq() <= cur_seq) {
+ logger().error("{} got old message {} <= {} {} {}, discarding",
+ conn, message->get_seq(), cur_seq, message, *message);
+ if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+ conf.ms_die_on_old_message) {
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return;
+ } else if (message->get_seq() > cur_seq + 1) {
+ logger().error("{} missed message? skipped from seq {} to {}",
+ conn, cur_seq, message->get_seq());
+ if (conf.ms_die_on_skipped_message) {
+ ceph_assert(0 == "skipped incoming seq");
+ }
+ }
+
+ // note last received message.
+ conn.in_seq = message->get_seq();
+ logger().debug("{} received message m={} seq={} from={} type={} {}",
+ conn, message, message->get_seq(), message->get_source(),
+ header.type, *message);
+
+ if (!conn.policy.lossy) {
+ // ++ack_left;
+ }
+ handle_message_ack(current_header.ack_seq);
+
+ // TODO: change MessageRef with seastar::shared_ptr
+ auto msg_ref = MessageRef{message, false};
+ seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+ return dispatcher.ms_dispatch(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()),
+ std::move(msg))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
+ ceph_assert(false);
+ });
+ });
+ });
}
void ProtocolV2::execute_ready()
{
- // <schedule sending messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame,
- // i.e. trigger READY state with write_state_t::open>
- // trigger_state(state_t::READY, write_state_t::open, false);
- // TODO: schedule reading messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame
- state = state_t::READY;
- logger().info("{} reaches READY state successfully.", conn);
- close();
+ trigger_state(state_t::READY, write_state_t::open, false);
+ seastar::with_gate(pending_dispatch, [this] {
+ return seastar::keep_doing([this] {
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::MESSAGE: {
+ return seastar::futurize_apply([this] {
+ // throttle_message() logic
+ if (!conn.policy.throttler_messages) {
+ return seastar::now();
+ }
+ // TODO: message throttler
+ ceph_assert(false);
+ return seastar::now();
+ }).then([this] {
+ // throttle_bytes() logic
+ if (!conn.policy.throttler_bytes) {
+ return seastar::now();
+ }
+ size_t cur_msg_size = get_current_msg_size();
+ if (!cur_msg_size) {
+ return seastar::now();
+ }
+ logger().debug("{} wants {} bytes from policy throttler {}/{}",
+ conn, cur_msg_size,
+ conn.policy.throttler_bytes->get_current(),
+ conn.policy.throttler_bytes->get_max());
+ return conn.policy.throttler_bytes->get(cur_msg_size);
+ }).then([this] {
+ // TODO: throttle_dispatch_queue() logic
+ utime_t throttle_stamp = ceph_clock_now();
+ return read_message(throttle_stamp);
+ });
+ }
+ case Tag::ACK:
+ return read_frame_payload()
+ .then([this] {
+ // handle_message_ack() logic
+ auto ack = AckFrame::Decode(rx_segments_data.back());
+ handle_message_ack(ack.seq());
+ });
+ case Tag::KEEPALIVE2:
+ return read_frame_payload()
+ .then([this] {
+ // handle_keepalive2() logic
+ auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+ last_keepalive_ack_to_send = keepalive_frame.timestamp();
+ logger().debug("{} got KEEPALIVE2 {}",
+ conn, last_keepalive_ack_to_send);
+ conn.last_keepalive = ceph_clock_now();
+ notify_keepalive_ack();
+ });
+ case Tag::KEEPALIVE2_ACK:
+ return read_frame_payload()
+ .then([this] {
+ // handle_keepalive2_ack() logic
+ auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
+ conn.last_keepalive_ack = keepalive_ack_frame.timestamp();
+ logger().debug("{} got KEEPALIVE_ACK {}",
+ conn, conn.last_keepalive_ack);
+ });
+ default:
+ return unexpected_tag(tag, conn, "execute_ready");
+ }
+ });
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // TODO: handle fault in READY state
+ return fault();
+ });
+ });
}
// STANDBY state