]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: msgr2: expected tags validation 26628/head
authorRicardo Dias <rdias@suse.com>
Fri, 22 Feb 2019 11:06:19 +0000 (11:06 +0000)
committerRicardo Dias <rdias@suse.com>
Tue, 26 Feb 2019 16:01:15 +0000 (16:01 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index d76feae7384fbfa5e70c530a4cea0c21ea79c3ed..19fb065cc6534a6434deeaf7dcf09bd7104aa859 100644 (file)
@@ -54,8 +54,9 @@ void ProtocolV2::run_continuation(CtPtr continuation) {
 
 #define READB(L, B, C) read(CONTINUATION(C), L, B)
 
-#ifdef UNIT_TESTS_BUILT
+#define TAG_MASK(T) (1 << ((uint64_t)(T)-1))
 
+#ifdef UNIT_TESTS_BUILT
 #define INTERCEPT(S) { \
 if(connection->interceptor) { \
   auto a = connection->interceptor->intercept(connection, (S)); \
@@ -105,6 +106,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       can_write(false),
       bannerExchangeCallback(nullptr),
       next_payload_len(0),
+      sent_tag(static_cast<Tag>(0)),
+      next_tag(static_cast<Tag>(0)),
       keepalive(false) {
   temp_buffer = new char[4096];
 }
@@ -242,6 +245,9 @@ void ProtocolV2::reset_recv_state() {
   connection->pendingReadLen.reset();
   connection->writeCallback.reset();
 
+  sent_tag = static_cast<Tag>(0);
+  next_tag = static_cast<Tag>(0);
+
   if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
       connection->policy.throttler_messages) {
     ldout(cct, 10) << __func__ << " releasing " << 1
@@ -562,6 +568,8 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
   ldout(cct, 5) << __func__ << " sending message m=" << m
                 << " seq=" << m->get_seq() << " " << *m << dendl;
 
+  sent_tag = message.tag;
+
   m->trace.event("async writing message");
   ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
                  << " src=" << entity_name_t(messenger->get_myname())
@@ -676,6 +684,7 @@ void ProtocolV2::write_event() {
         s = in_seq;
         AckFrame ack(session_stream_handlers, in_seq);
         connection->outcoming_bl.claim_append(ack.get_buffer());
+        sent_tag = ack.tag;
         ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
                        << " messages" << dendl;
         ack_left -= left;
@@ -751,6 +760,7 @@ template <class F>
 CtPtr ProtocolV2::write(const std::string &desc,
                         CONTINUATION_PARAM(next, ProtocolV2),
                         F &frame) {
+  sent_tag = frame.tag;
   return write(desc, CONTINUATION(next), frame.get_buffer());
 }
 
@@ -949,6 +959,65 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
   return callback;
 }
 
+uint64_t ProtocolV2::expected_tags(Tag sent_tag, Tag received_tag) {
+  switch(sent_tag) {
+    case Tag::HELLO:
+      if (received_tag == Tag::HELLO) {
+        ceph_assert(state == ACCEPTING);
+        return TAG_MASK(Tag::AUTH_REQUEST);
+      } else {
+        return TAG_MASK(Tag::HELLO);
+      }
+    case Tag::AUTH_REQUEST:
+    case Tag::AUTH_REQUEST_MORE:
+      return TAG_MASK(Tag::AUTH_REPLY_MORE) | TAG_MASK(Tag::AUTH_DONE) |
+             TAG_MASK(Tag::AUTH_BAD_METHOD);
+    case Tag::AUTH_BAD_METHOD:
+      return TAG_MASK(Tag::AUTH_REQUEST);
+    case Tag::AUTH_REPLY_MORE:
+      return TAG_MASK(Tag::AUTH_REQUEST_MORE);
+    case Tag::AUTH_DONE:
+      return TAG_MASK(Tag::CLIENT_IDENT) | TAG_MASK(Tag::SESSION_RECONNECT);
+    case Tag::CLIENT_IDENT:
+      if (state == READY) {
+        return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
+               TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
+      } else {
+        ceph_assert(state == CONNECTING);
+        return TAG_MASK(Tag::SERVER_IDENT) |
+               TAG_MASK(Tag::IDENT_MISSING_FEATURES) | TAG_MASK(Tag::WAIT);
+      }
+    case Tag::SESSION_RECONNECT:
+      if (state == READY) {
+        return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
+               TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
+      } else {
+        ceph_assert(state == CONNECTING);
+        return TAG_MASK(Tag::SESSION_RECONNECT_OK) |
+               TAG_MASK(Tag::SESSION_RESET) | TAG_MASK(Tag::SESSION_RETRY) |
+               TAG_MASK(Tag::SESSION_RETRY_GLOBAL) | TAG_MASK(Tag::WAIT);
+      }
+    case Tag::SESSION_RESET:
+      return TAG_MASK(Tag::CLIENT_IDENT);
+    case Tag::SESSION_RETRY:
+    case Tag::SESSION_RETRY_GLOBAL:
+      return TAG_MASK(Tag::SESSION_RECONNECT);
+    case Tag::SERVER_IDENT:
+    case Tag::SESSION_RECONNECT_OK:
+    case Tag::KEEPALIVE2:
+    case Tag::KEEPALIVE2_ACK:
+    case Tag::ACK:
+    case Tag::MESSAGE:
+      ceph_assert(state == READY);
+      return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
+             TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
+    case Tag::IDENT_MISSING_FEATURES:
+    case Tag::WAIT:
+      return 0;  // the peer should reset when receiving these tags
+  }
+  return 0;
+}
+
 CtPtr ProtocolV2::read_frame() {
   if (state == CLOSED) {
     return nullptr;
@@ -1014,8 +1083,17 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
       return _fault();
     }
 
+    Tag received_tag = next_tag;
     next_tag = static_cast<Tag>(main_preamble.tag);
 
+    uint64_t expected_tag_mask = expected_tags(sent_tag, received_tag);
+    if (!(TAG_MASK(next_tag) & expected_tag_mask)) {
+      lderr(cct) << __func__ << " received unexpected tag: expected=0x"
+                 << std::hex << expected_tag_mask << " got=0x"
+                 << TAG_MASK(next_tag) << std::dec << dendl;
+      return _fault();
+    }
+
     rx_segments_desc.clear();
     rx_segments_data.clear();
     next_payload_len = 0;
index 045be4623ce88e95a502aefc5e1d356a9e7a6253..a549ec80d6f14c8905099169ecc74ea21286e84d 100644 (file)
@@ -116,6 +116,7 @@ public:
 
 private:
 
+  ceph::msgr::v2::Tag sent_tag;
   ceph::msgr::v2::Tag next_tag;
   ceph_msg_header2 current_header;
   utime_t backoff;  // backoff time
@@ -140,6 +141,9 @@ private:
                         CONTINUATION_PARAM(next, ProtocolV2),
                         bufferlist &buffer);
 
+  uint64_t expected_tags(ceph::msgr::v2::Tag sent_tag,
+                         ceph::msgr::v2::Tag received_tag);
+
   void requeue_sent();
   uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
   void reset_recv_state();