From e9ac3e0530b73ebb43db291d6562712b28c1e653 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Fri, 22 Feb 2019 11:06:19 +0000 Subject: [PATCH] msg/async: msgr2: expected tags validation Signed-off-by: Ricardo Dias --- src/msg/async/ProtocolV2.cc | 80 ++++++++++++++++++++++++++++++++++++- src/msg/async/ProtocolV2.h | 4 ++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index d76feae7384fb..19fb065cc6534 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -54,8 +54,9 @@ void ProtocolV2::run_continuation(CtPtr continuation) { #define READB(L, B, C) read(CONTINUATION(C), L, B) -#ifdef UNIT_TESTS_BUILT +#define TAG_MASK(T) (1 << ((uint64_t)(T)-1)) +#ifdef UNIT_TESTS_BUILT #define INTERCEPT(S) { \ if(connection->interceptor) { \ auto a = connection->interceptor->intercept(connection, (S)); \ @@ -105,6 +106,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) can_write(false), bannerExchangeCallback(nullptr), next_payload_len(0), + sent_tag(static_cast(0)), + next_tag(static_cast(0)), keepalive(false) { temp_buffer = new char[4096]; } @@ -242,6 +245,9 @@ void ProtocolV2::reset_recv_state() { connection->pendingReadLen.reset(); connection->writeCallback.reset(); + sent_tag = static_cast(0); + next_tag = static_cast(0); + if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE && connection->policy.throttler_messages) { ldout(cct, 10) << __func__ << " releasing " << 1 @@ -562,6 +568,8 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { ldout(cct, 5) << __func__ << " sending message m=" << m << " seq=" << m->get_seq() << " " << *m << dendl; + sent_tag = message.tag; + m->trace.event("async writing message"); ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq() << " src=" << entity_name_t(messenger->get_myname()) @@ -676,6 +684,7 @@ void ProtocolV2::write_event() { s = in_seq; AckFrame ack(session_stream_handlers, in_seq); connection->outcoming_bl.claim_append(ack.get_buffer()); + sent_tag = ack.tag; ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left -= left; @@ -751,6 +760,7 @@ template CtPtr ProtocolV2::write(const std::string &desc, CONTINUATION_PARAM(next, ProtocolV2), F &frame) { + sent_tag = frame.tag; return write(desc, CONTINUATION(next), frame.get_buffer()); } @@ -949,6 +959,65 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload) return callback; } +uint64_t ProtocolV2::expected_tags(Tag sent_tag, Tag received_tag) { + switch(sent_tag) { + case Tag::HELLO: + if (received_tag == Tag::HELLO) { + ceph_assert(state == ACCEPTING); + return TAG_MASK(Tag::AUTH_REQUEST); + } else { + return TAG_MASK(Tag::HELLO); + } + case Tag::AUTH_REQUEST: + case Tag::AUTH_REQUEST_MORE: + return TAG_MASK(Tag::AUTH_REPLY_MORE) | TAG_MASK(Tag::AUTH_DONE) | + TAG_MASK(Tag::AUTH_BAD_METHOD); + case Tag::AUTH_BAD_METHOD: + return TAG_MASK(Tag::AUTH_REQUEST); + case Tag::AUTH_REPLY_MORE: + return TAG_MASK(Tag::AUTH_REQUEST_MORE); + case Tag::AUTH_DONE: + return TAG_MASK(Tag::CLIENT_IDENT) | TAG_MASK(Tag::SESSION_RECONNECT); + case Tag::CLIENT_IDENT: + if (state == READY) { + return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) | + TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK); + } else { + ceph_assert(state == CONNECTING); + return TAG_MASK(Tag::SERVER_IDENT) | + TAG_MASK(Tag::IDENT_MISSING_FEATURES) | TAG_MASK(Tag::WAIT); + } + case Tag::SESSION_RECONNECT: + if (state == READY) { + return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) | + TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK); + } else { + ceph_assert(state == CONNECTING); + return TAG_MASK(Tag::SESSION_RECONNECT_OK) | + TAG_MASK(Tag::SESSION_RESET) | TAG_MASK(Tag::SESSION_RETRY) | + TAG_MASK(Tag::SESSION_RETRY_GLOBAL) | TAG_MASK(Tag::WAIT); + } + case Tag::SESSION_RESET: + return TAG_MASK(Tag::CLIENT_IDENT); + case Tag::SESSION_RETRY: + case Tag::SESSION_RETRY_GLOBAL: + return TAG_MASK(Tag::SESSION_RECONNECT); + case Tag::SERVER_IDENT: + case Tag::SESSION_RECONNECT_OK: + case Tag::KEEPALIVE2: + case Tag::KEEPALIVE2_ACK: + case Tag::ACK: + case Tag::MESSAGE: + ceph_assert(state == READY); + return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) | + TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK); + case Tag::IDENT_MISSING_FEATURES: + case Tag::WAIT: + return 0; // the peer should reset when receiving these tags + } + return 0; +} + CtPtr ProtocolV2::read_frame() { if (state == CLOSED) { return nullptr; @@ -1014,8 +1083,17 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { return _fault(); } + Tag received_tag = next_tag; next_tag = static_cast(main_preamble.tag); + uint64_t expected_tag_mask = expected_tags(sent_tag, received_tag); + if (!(TAG_MASK(next_tag) & expected_tag_mask)) { + lderr(cct) << __func__ << " received unexpected tag: expected=0x" + << std::hex << expected_tag_mask << " got=0x" + << TAG_MASK(next_tag) << std::dec << dendl; + return _fault(); + } + rx_segments_desc.clear(); rx_segments_data.clear(); next_payload_len = 0; diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 045be4623ce88..a549ec80d6f14 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -116,6 +116,7 @@ public: private: + ceph::msgr::v2::Tag sent_tag; ceph::msgr::v2::Tag next_tag; ceph_msg_header2 current_header; utime_t backoff; // backoff time @@ -140,6 +141,9 @@ private: CONTINUATION_PARAM(next, ProtocolV2), bufferlist &buffer); + uint64_t expected_tags(ceph::msgr::v2::Tag sent_tag, + ceph::msgr::v2::Tag received_tag); + void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); void reset_recv_state(); -- 2.39.5