]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: msgr2: refactored the frame structures
authorRicardo Dias <rdias@suse.com>
Fri, 16 Nov 2018 17:17:34 +0000 (17:17 +0000)
committerRicardo Dias <rdias@suse.com>
Wed, 23 Jan 2019 13:59:25 +0000 (13:59 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/Protocol.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index ddf8bafe363ae3d2810e52e8a0686a5e333ebcf4..5a115cb57c562b3d43f4fcf99905a757b9efe910 100644 (file)
@@ -64,14 +64,6 @@ public:
     }                                                             \
   }
 
-#define CONTINUATION_RUN2(I, CT)                               \
-  {                                                            \
-    Ct<std::remove_reference<decltype(*I)>::type> *_cont = CT; \
-    while (_cont) {                                            \
-      _cont = _cont->call(I);                                  \
-    }                                                          \
-  }
-
 #define READ_HANDLER_CONTINUATION_DECL(C, F) \
   CONTINUATION_DECL(C, F, char *, int)
 #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int)
index b5b36f025d345d9796d179dbfa42f3d63753f36c..4e0c66cac5490b0a369971d9975dd7c15161ca43 100644 (file)
@@ -21,16 +21,31 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
                 << " l=" << connection->policy.lossy << ").";
 }
 
+// TODO: REMOVE THIS
+void ProtocolV2::log(const std::string message, uint64_t val, uint64_t val2) {
+  ldout(cct, 1) << __func__ << " " << message << val << ":" << val2 << dendl;
+}
+
+using CtPtr = Ct<ProtocolV2> *;
+
+void ProtocolV2::run_continuation(CtPtr continuation) {
+  try {
+    CONTINUATION_RUN(continuation)
+  } catch (const buffer::error &e) {
+    lderr(cct) << __func__ << " failed decoding of frame header: " << e
+               << dendl;
+    _fault();
+  }
+}
+
 const int ASYNC_COALESCE_THRESHOLD = 256;
 
-#define WRITE(B, C) write(CONTINUATION(C), B)
+#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
 
 #define READ(L, C) read(CONTINUATION(C), L)
 
 #define READB(L, B, C) read(CONTINUATION(C), L, B)
 
-using CtPtr = Ct<ProtocolV2> *;
-
 static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
   // create a buffer to read into that matches the data alignment
   unsigned alloc_len = 0;
@@ -48,6 +63,318 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
   data.push_back(std::move(ptr));
 }
 
+/**
+ * Protocol V2 Frame Structures
+ **/
+
+template <class T>
+struct Frame {
+protected:
+  bufferlist payload;
+  bufferlist frame_buffer;
+
+public:
+  Frame() {}
+
+  bufferlist &get_buffer() {
+    if (frame_buffer.length()) {
+      return frame_buffer;
+    }
+    encode((uint32_t)(payload.length() + sizeof(uint32_t)), frame_buffer, -1ll);
+    uint32_t tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+    ceph_assert(tag != 0);
+    encode(tag, frame_buffer, -1ll);
+    frame_buffer.claim_append(payload);
+    return frame_buffer;
+  }
+
+  void decode_frame(char *payload, uint32_t length) {
+    bufferlist bl;
+    bl.push_back(buffer::create_static(length, payload));
+    auto ti = bl.cbegin();
+    static_cast<T *>(this)->decode_payload(ti);
+  }
+
+  void decode_payload(bufferlist::const_iterator &ti) {}
+};
+
+template <class C, typename... Args>
+struct PayloadFrame : public Frame<C> {
+  std::tuple<Args...> _values;
+
+  template <typename T>
+  inline void _encode_payload_each(T &t) {
+    if constexpr (std::is_same<T, bufferlist const>()) {
+      encode((uint32_t)t.length(), this->payload, -1ll);
+      this->payload.claim_append((bufferlist &)t);
+    } else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
+      encode((uint32_t)t.size(), this->payload, -1ll);
+      for (const auto &elem : t) {
+        encode(elem, this->payload, 0);
+      }
+    } else {
+      encode(t, this->payload, -1ll);
+    }
+  }
+
+  PayloadFrame(const Args &... args) { (_encode_payload_each(args), ...); }
+
+  PayloadFrame(char *payload, uint32_t length) {
+    this->decode_frame(payload, length);
+  }
+
+  template <typename T>
+  inline void _decode_payload_each(T &t, bufferlist::const_iterator &ti) const {
+    if constexpr (std::is_same<T, bufferlist>()) {
+      uint32_t len;
+      decode(len, ti);
+      ceph_assert(len == ti.get_remaining());
+      if (len) {
+        t.append(ti.get_current_ptr());
+      }
+    } else if constexpr (std::is_same<T, std::vector<uint32_t>>()) {
+      uint32_t size;
+      decode(size, ti);
+      for (uint32_t i = 0; i < size; ++i) {
+        decode(t[i], ti);
+      }
+    } else {
+      decode(t, ti);
+    }
+  }
+
+  template <std::size_t... Is>
+  inline void _decode_payload(bufferlist::const_iterator &ti,
+                              std::index_sequence<Is...>) const {
+    (_decode_payload_each((Args &)std::get<Is>(_values), ti), ...);
+  }
+
+  void decode_payload(bufferlist::const_iterator &ti) {
+    _decode_payload(ti, std::index_sequence_for<Args...>());
+  }
+
+  template <std::size_t N>
+  inline decltype(auto) get_val() {
+    return std::get<N>(_values);
+  }
+};
+
+struct AuthRequestFrame
+    : public PayloadFrame<AuthRequestFrame, uint32_t, bufferlist> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST;
+  using PayloadFrame::PayloadFrame;
+
+  AuthRequestFrame(uint32_t method) : AuthRequestFrame(method, bufferlist()) {}
+
+  inline uint32_t &method() { return get_val<0>(); }
+  inline bufferlist &auth_payload() { return get_val<1>(); }
+};
+
+struct AuthBadMethodFrame
+    : public PayloadFrame<AuthBadMethodFrame, uint32_t, std::vector<uint32_t>> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_METHOD;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint32_t &method() { return get_val<0>(); }
+  inline std::vector<uint32_t> &allowed_methods() { return get_val<1>(); }
+};
+
+struct AuthBadAuthFrame
+    : public PayloadFrame<AuthBadAuthFrame, uint32_t, std::string> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_AUTH;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint32_t &error_code() { return get_val<0>(); }
+  inline std::string &error_msg() { return get_val<1>(); }
+};
+
+struct AuthMoreFrame : public PayloadFrame<AuthMoreFrame, bufferlist> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_MORE;
+  using PayloadFrame::PayloadFrame;
+
+  inline bufferlist &auth_payload() { return get_val<0>(); }
+};
+
+struct AuthDoneFrame
+    : public PayloadFrame<AuthDoneFrame, uint64_t, bufferlist> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_DONE;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint64_t &flags() { return get_val<0>(); }
+  inline bufferlist &auth_payload() { return get_val<1>(); }
+};
+
+template <class T, typename... Args>
+struct SignedEncryptedFrame : public PayloadFrame<T, Args...> {
+protected:
+  ProtocolV2 *protocol;
+
+public:
+  SignedEncryptedFrame(ProtocolV2 *protocol, const Args &... args)
+      : PayloadFrame<T, Args...>(args...), protocol(protocol) {}
+
+  SignedEncryptedFrame(ProtocolV2 *protocol, char *payload, uint32_t length)
+      : PayloadFrame<T, Args...>(payload, length), protocol(protocol) {}
+
+  bufferlist &get_buffer() {
+    if (this->frame_buffer.length()) {
+      return this->frame_buffer;
+    }
+
+    bufferlist signature;
+    if (protocol->session_security) {
+      protocol->session_security->sign_bufferlist(this->payload, signature);
+      protocol->log("payload signature=", signature.length(), this->payload.length());
+    }
+
+    encode((uint32_t)(this->payload.length() + sizeof(uint32_t)),
+           this->frame_buffer, -1ll);
+    uint32_t tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+    ceph_assert(tag != 0);
+    encode(tag, this->frame_buffer, -1ll);
+    this->frame_buffer.claim_append(this->payload);
+    return this->frame_buffer;
+  }
+};
+
+struct ClientIdentFrame
+    : public SignedEncryptedFrame<ClientIdentFrame, entity_addrvec_t, int64_t,
+                                  uint64_t, uint64_t, uint64_t, uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline entity_addrvec_t &addrs() { return get_val<0>(); }
+  inline int64_t &gid() { return get_val<1>(); }
+  inline uint64_t &global_seq() { return get_val<2>(); }
+  inline uint64_t &supported_features() { return get_val<3>(); }
+  inline uint64_t &required_features() { return get_val<4>(); }
+  inline uint64_t &flags() { return get_val<5>(); }
+};
+
+struct ServerIdentFrame
+    : public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t, int64_t,
+                                  uint64_t, uint64_t, uint64_t, uint64_t,
+                                  uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline entity_addrvec_t &addrs() { return get_val<0>(); }
+  inline int64_t &gid() { return get_val<1>(); }
+  inline uint64_t &global_seq() { return get_val<2>(); }
+  inline uint64_t &supported_features() { return get_val<3>(); }
+  inline uint64_t &required_features() { return get_val<4>(); }
+  inline uint64_t &flags() { return get_val<5>(); }
+  inline uint64_t &cookie() { return get_val<6>(); }
+};
+
+struct ReconnectFrame
+    : public SignedEncryptedFrame<ReconnectFrame, entity_addrvec_t, uint64_t,
+                                  uint64_t, uint64_t, uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline entity_addrvec_t &addrs() { return get_val<0>(); }
+  inline uint64_t &cookie() { return get_val<1>(); }
+  inline uint64_t &global_seq() { return get_val<2>(); }
+  inline uint64_t &connect_seq() { return get_val<3>(); }
+  inline uint64_t &msg_seq() { return get_val<4>(); }
+};
+
+struct ResetFrame : public Frame<ResetFrame> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RESET;
+};
+
+struct RetryFrame : public SignedEncryptedFrame<RetryFrame, uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  uint64_t connect_seq() { return get_val<0>(); }
+};
+
+struct RetryGlobalFrame
+    : public SignedEncryptedFrame<RetryGlobalFrame, uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY_GLOBAL;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &global_seq() { return get_val<0>(); }
+};
+
+struct WaitFrame : public Frame<WaitFrame> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::WAIT;
+};
+
+struct ReconnectOkFrame
+    : public SignedEncryptedFrame<ReconnectOkFrame, uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT_OK;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &msg_seq() { return get_val<0>(); }
+};
+
+struct IdentMissingFeaturesFrame
+    : public SignedEncryptedFrame<IdentMissingFeaturesFrame, uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT_MISSING_FEATURES;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &features() { return get_val<0>(); }
+};
+
+struct MessageFrame : public Frame<MessageFrame> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE;
+  const unsigned int ASYNC_COALESCE_THRESHOLD = 256;
+
+  ceph_msg_header2 header2;
+
+  MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq,
+               bool calc_crc) {
+    ceph_msg_header &header = msg->get_header();
+    ceph_msg_footer &footer = msg->get_footer();
+
+    header2 = ceph_msg_header2{header.seq,        header.tid,
+                               header.type,       header.priority,
+                               header.version,    header.front_len,
+                               header.middle_len, 0,
+                               header.data_len,   header.data_off,
+                               ack_seq,           footer.front_crc,
+                               footer.middle_crc, footer.data_crc,
+                               footer.flags,      header.compat_version,
+                               header.reserved,   0};
+
+    if (calc_crc) {
+      header2.header_crc =
+          ceph_crc32c(0, (unsigned char *)&header2,
+                      sizeof(header2) - sizeof(header2.header_crc));
+    }
+
+    payload.append((char *)&header2, sizeof(header2));
+    if ((data.length() <= ASYNC_COALESCE_THRESHOLD) &&
+        (data.buffers().size() > 1)) {
+      for (const auto &pb : data.buffers()) {
+        payload.append((char *)pb.c_str(), pb.length());
+      }
+    } else {
+      payload.claim_append(data);
+    }
+  }
+};
+
+struct KeepAliveFrame : public SignedEncryptedFrame<KeepAliveFrame, utime_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::KEEPALIVE2;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  KeepAliveFrame(ProtocolV2 *protocol)
+      : KeepAliveFrame(protocol, ceph_clock_now()) {}
+
+  inline utime_t &timestamp() { return get_val<0>(); }
+};
+
+struct AckFrame : public SignedEncryptedFrame<AckFrame, uint64_t> {
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::ACK;
+  using SignedEncryptedFrame::SignedEncryptedFrame;
+
+  inline uint64_t &seq() { return get_val<0>(); }
+};
+
 ProtocolV2::ProtocolV2(AsyncConnection *connection)
     : Protocol(2, connection),
       temp_buffer(nullptr),
@@ -55,14 +382,17 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       peer_required_features(0),
       authorizer(nullptr),
       got_bad_auth(false),
+      got_bad_method(0),
+      auth_flags(0),
       cookie(0),
+      global_seq(0),
       connect_seq(0),
       peer_global_seq(0),
       message_seq(0),
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
-      next_frame_len(0),
+      next_payload_len(0),
       keepalive(false) {
   temp_buffer = new char[4096];
 }
@@ -78,11 +408,11 @@ void ProtocolV2::connect() {
   state = START_CONNECT;
 
   got_bad_auth = false;
+  got_bad_method = 0;
   if (authorizer) {
     delete authorizer;
     authorizer = nullptr;
   }
-  global_seq = messenger->get_global_seq();
 }
 
 void ProtocolV2::accept() { state = START_ACCEPT; }
@@ -101,10 +431,10 @@ void ProtocolV2::discard_out_queue() {
     (*p)->put();
   }
   sent.clear();
-  for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
+  for (map<int, list<pair<bufferlist, Message *>>>::iterator p =
            out_queue.begin();
        p != out_queue.end(); ++p) {
-    for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
+    for (list<pair<bufferlist, Message *>>::iterator r = p->second.begin();
          r != p->second.end(); ++r) {
       ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
       r->second->put();
@@ -162,7 +492,7 @@ void ProtocolV2::requeue_sent() {
     return;
   }
 
-  list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+  list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
   out_seq -= sent.size();
   while (!sent.empty()) {
     Message *m = sent.back();
@@ -179,7 +509,7 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
     return seq;
   }
-  list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+  list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
   uint64_t count = out_seq;
   while (!rq.empty()) {
     pair<bufferlist, Message *> p = rq.front();
@@ -202,6 +532,7 @@ void ProtocolV2::reset_recv_state() {
     }
     authorizer = nullptr;
     got_bad_auth = false;
+    got_bad_method = 0;
   }
 
   // clean read and write callbacks
@@ -390,22 +721,22 @@ void ProtocolV2::read_event() {
 
   switch (state) {
     case START_CONNECT:
-      CONTINUATION_RUN(CONTINUATION(start_client_banner_exchange));
+      run_continuation(CONTINUATION(start_client_banner_exchange));
       break;
     case START_ACCEPT:
-      CONTINUATION_RUN(CONTINUATION(start_server_banner_exchange));
+      run_continuation(CONTINUATION(start_server_banner_exchange));
       break;
     case READY:
-      CONTINUATION_RUN(CONTINUATION(read_frame));
+      run_continuation(CONTINUATION(read_frame));
       break;
     case THROTTLE_MESSAGE:
-      CONTINUATION_RUN(CONTINUATION(throttle_message));
+      run_continuation(CONTINUATION(throttle_message));
       break;
     case THROTTLE_BYTES:
-      CONTINUATION_RUN(CONTINUATION(throttle_bytes));
+      run_continuation(CONTINUATION(throttle_bytes));
       break;
     case THROTTLE_DISPATCH_QUEUE:
-      CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
+      run_continuation(CONTINUATION(throttle_dispatch_queue));
       break;
     default:
       break;
@@ -415,10 +746,10 @@ void ProtocolV2::read_event() {
 Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) {
   Message *m = 0;
   if (!out_queue.empty()) {
-    map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
+    map<int, list<pair<bufferlist, Message *>>>::reverse_iterator it =
         out_queue.rbegin();
     ceph_assert(!it->second.empty());
-    list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
+    list<pair<bufferlist, Message *>>::iterator p = it->second.begin();
     m = p->second;
     if (bl) {
       bl->swap(p->first);
@@ -477,14 +808,12 @@ ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) {
 
 void ProtocolV2::append_keepalive() {
   ldout(cct, 10) << __func__ << dendl;
-  KeepAliveFrame keepalive_frame;
+  KeepAliveFrame keepalive_frame(this);
   connection->outcoming_bl.claim_append(keepalive_frame.get_buffer());
 }
 
 void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
-  struct ceph_timespec ts;
-  timestamp.encode_timeval(&ts);
-  KeepAliveFrame keepalive_ack_frame(ts);
+  KeepAliveFrame keepalive_ack_frame(this, timestamp);
   connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer());
 }
 
@@ -566,7 +895,7 @@ void ProtocolV2::write_event() {
       if (left) {
         ceph_le64 s;
         s = in_seq;
-        AckFrame ack(in_seq);
+        AckFrame ack(this, in_seq);
         connection->outcoming_bl.claim_append(ack.get_buffer());
         ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
                        << " messages" << dendl;
@@ -625,7 +954,7 @@ CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
   ssize_t r = connection->read(len, buffer,
                                [CONTINUATION(next), this](char *buffer, int r) {
                                  CONTINUATION(next)->setParams(buffer, r);
-                                 CONTINUATION_RUN(CONTINUATION(next));
+                                 run_continuation(CONTINUATION(next));
                                });
   if (r <= 0) {
     return CONTINUE(next, buffer, r);
@@ -634,14 +963,27 @@ CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
   return nullptr;
 }
 
-CtPtr ProtocolV2::write(CONTINUATION_PARAM(next, ProtocolV2, int),
+CtPtr ProtocolV2::write(const std::string &desc,
+                        CONTINUATION_PARAM(next, ProtocolV2),
                         bufferlist &buffer) {
-  ssize_t r = connection->write(buffer, [CONTINUATION(next), this](int r) {
-    CONTINUATION(next)->setParams(r);
-    CONTINUATION_RUN(CONTINUATION(next));
-  });
+  ssize_t r =
+      connection->write(buffer, [CONTINUATION(next), desc, this](int r) {
+        if (r < 0) {
+          ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
+                        << " (" << cpp_strerror(r) << ")" << dendl;
+          connection->inject_delay();
+          _fault();
+        }
+        run_continuation(CONTINUATION(next));
+      });
+
   if (r <= 0) {
-    return CONTINUE(next, r);
+    if (r < 0) {
+      ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
+                    << " (" << cpp_strerror(r) << ")" << dendl;
+      return _fault();
+    }
+    return CONTINUE(next);
   }
 
   return nullptr;
@@ -670,17 +1012,10 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) {
   bufferlist bl;
   bl.append(banner, banner_len);
 
-  return WRITE(bl, _banner_exchange_handle_write);
+  return WRITE(bl, "banner", _wait_for_peer_banner);
 }
 
-CtPtr ProtocolV2::_banner_exchange_handle_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " write banner failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
+CtPtr ProtocolV2::_wait_for_peer_banner() {
   unsigned banner_len =
       strlen(CEPH_BANNER_V2_PREFIX) + sizeof(uint8_t) + 2 * sizeof(__le64);
   return READ(banner_len, _banner_exchange_handle_peer_banner);
@@ -738,7 +1073,7 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
     if (connection->get_peer_type() != peer_type) {
       ldout(cct, 1) << __func__ << " connection peer type does not match what"
                     << " peer advertises " << connection->get_peer_type()
-                    << " != " << peer_type << dendl;
+                    << " != " << (int)peer_type << dendl;
       stop();
       connection->dispatch_queue->queue_reset(connection);
       return nullptr;
@@ -806,10 +1141,23 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) {
     return _fault();
   }
 
-  next_frame_len = *(uint32_t *)buffer - sizeof(uint32_t);
-  next_tag = static_cast<Tag>(*(uint32_t *)(buffer + sizeof(uint32_t)));
+  bufferlist bl;
+  bl.push_back(buffer::create_static(sizeof(uint32_t) * 2, buffer));
+  try {
+    auto ti = bl.cbegin();
+    uint32_t frame_len;
+    decode(frame_len, ti);
+    next_payload_len = frame_len - sizeof(uint32_t);
+    uint32_t tag;
+    decode(tag, ti);
+    next_tag = static_cast<Tag>(tag);
+  } catch (const buffer::error &e) {
+    lderr(cct) << __func__ << " failed decoding of frame header: " << e
+               << dendl;
+    return _fault();
+  }
 
-  ldout(cct, 10) << __func__ << " next frame_len=" << next_frame_len
+  ldout(cct, 10) << __func__ << " next payload_len=" << next_payload_len
                  << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
 
   switch (next_tag) {
@@ -827,13 +1175,19 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) {
     case Tag::KEEPALIVE2:
     case Tag::KEEPALIVE2_ACK:
     case Tag::ACK:
-      return READ(next_frame_len, handle_frame_payload);
+      return READ(next_payload_len, handle_frame_payload);
     case Tag::SESSION_RESET:
       return handle_session_reset();
     case Tag::WAIT:
       return handle_wait();
     case Tag::MESSAGE:
       return handle_message();
+    default: {
+      lderr(cct) << __func__
+                 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
+                 << dendl;
+      ceph_abort();
+    }
   }
 
   return nullptr;
@@ -850,33 +1204,33 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
 
   switch (next_tag) {
     case Tag::AUTH_REQUEST:
-      return handle_auth_request(buffer, next_frame_len);
+      return handle_auth_request(buffer, next_payload_len);
     case Tag::AUTH_BAD_METHOD:
-      return handle_auth_bad_method(buffer, next_frame_len);
+      return handle_auth_bad_method(buffer, next_payload_len);
     case Tag::AUTH_BAD_AUTH:
-      return handle_auth_bad_auth(buffer, next_frame_len);
+      return handle_auth_bad_auth(buffer, next_payload_len);
     case Tag::AUTH_MORE:
-      return handle_auth_more(buffer, next_frame_len);
+      return handle_auth_more(buffer, next_payload_len);
     case Tag::AUTH_DONE:
-      return handle_auth_done(buffer, next_frame_len);
+      return handle_auth_done(buffer, next_payload_len);
     case Tag::IDENT:
-      return handle_ident(buffer, next_frame_len);
+      return handle_ident(buffer, next_payload_len);
     case Tag::IDENT_MISSING_FEATURES:
-      return handle_ident_missing_features(buffer, next_frame_len);
+      return handle_ident_missing_features(buffer, next_payload_len);
     case Tag::SESSION_RECONNECT:
-      return handle_reconnect(buffer, next_frame_len);
+      return handle_reconnect(buffer, next_payload_len);
     case Tag::SESSION_RETRY:
-      return handle_session_retry(buffer, next_frame_len);
+      return handle_session_retry(buffer, next_payload_len);
     case Tag::SESSION_RETRY_GLOBAL:
-      return handle_session_retry_global(buffer, next_frame_len);
+      return handle_session_retry_global(buffer, next_payload_len);
     case Tag::SESSION_RECONNECT_OK:
-      return handle_reconnect_ok(buffer, next_frame_len);
+      return handle_reconnect_ok(buffer, next_payload_len);
     case Tag::KEEPALIVE2:
-      return handle_keepalive2(buffer, next_frame_len);
+      return handle_keepalive2(buffer, next_payload_len);
     case Tag::KEEPALIVE2_ACK:
-      return handle_keepalive2_ack(buffer, next_frame_len);
+      return handle_keepalive2_ack(buffer, next_payload_len);
     case Tag::ACK:
-      return handle_message_ack(buffer, next_frame_len);
+      return handle_message_ack(buffer, next_payload_len);
     default:
       ceph_abort();
   }
@@ -888,46 +1242,25 @@ CtPtr ProtocolV2::handle_auth_more(char *payload, uint32_t length) {
 
   AuthMoreFrame auth_more(payload, length);
   ldout(cct, 1) << __func__
-                << " auth more len=" << auth_more.auth_payload.length()
+                << " auth more len=" << auth_more.auth_payload().length()
                 << dendl;
 
   if (state == CONNECTING) {
     ldout(cct, 10) << __func__ << " connect got auth challenge" << dendl;
-
-    ceph_assert(authorizer);
-    authorizer->add_challenge(cct, auth_more.auth_payload);
-    AuthMoreFrame more_reply(authorizer->bl);
-    return WRITE(more_reply.get_buffer(), handle_auth_more_write);
-
+    if (auth_method == CEPH_AUTH_CEPHX) {
+      ceph_assert(authorizer);
+      authorizer->add_challenge(cct, auth_more.auth_payload());
+      AuthMoreFrame more_reply(authorizer->bl);
+      return WRITE(more_reply.get_buffer(), "auth more", read_frame);
+    } else {
+      ceph_abort("Auth method %d not implemented", auth_method);
+    }
   } else if (state == ACCEPTING) {
-    connection->lock.unlock();
-    bool authorizer_valid;
-    bufferlist authorizer_reply;
-    ceph_assert((bool)authorizer_challenge);
-    if (!messenger->ms_deliver_verify_authorizer(
-            connection, connection->peer_type, auth_method,
-            auth_more.auth_payload, authorizer_reply, authorizer_valid,
-            session_key, &authorizer_challenge) ||
-        !authorizer_valid) {
-      connection->lock.lock();
-
-      ldout(cct, 0) << __func__ << " got bad authorizer, auth_reply_len="
-                    << authorizer_reply.length() << dendl;
-      session_security.reset();
-      AuthBadAuthFrame bad_auth(EPERM, "Bad Authorizer");
-      bufferlist &bl = bad_auth.get_buffer();
-      return WRITE(bl, handle_auth_bad_auth_write);
+    if (auth_method == CEPH_AUTH_CEPHX) {
+      return handle_cephx_auth(auth_more.auth_payload());
+    } else {
+      ceph_abort("Auth method %d not implemented", auth_method);
     }
-
-    connection->lock.lock();
-
-    session_security.reset(get_auth_session_handler(
-        cct, auth_method, session_key,
-        CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2));
-
-    AuthDoneFrame auth_done(0, authorizer_reply);
-    bufferlist &bl = auth_done.get_buffer();
-    return WRITE(bl, handle_auth_done_write);
   } else {
     ceph_abort();
   }
@@ -935,18 +1268,6 @@ CtPtr ProtocolV2::handle_auth_more(char *payload, uint32_t length) {
   return nullptr;
 }
 
-CtPtr ProtocolV2::handle_auth_more_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " auth more write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
-  return CONTINUE(read_frame);
-}
-
 CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) {
   if (state == CONNECTING) {
     return handle_server_ident(payload, length);
@@ -1185,7 +1506,7 @@ CtPtr ProtocolV2::read_message_data_prepare() {
 
   if (data_len) {
     // get a buffer
-    map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
+    map<ceph_tid_t, pair<bufferlist, int>>::iterator p =
         connection->rx_buffers.find(current_header.tid);
     if (p != connection->rx_buffers.end()) {
       ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
@@ -1406,16 +1727,16 @@ CtPtr ProtocolV2::handle_message_complete() {
 CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  KeepAliveFrame keepalive_frame(payload, length);
+  KeepAliveFrame keepalive_frame(this, payload, length);
 
   ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
 
-  utime_t kp_t = utime_t(keepalive_frame.timestamp);
   connection->write_lock.lock();
-  append_keepalive_ack(kp_t);
+  append_keepalive_ack(keepalive_frame.timestamp());
   connection->write_lock.unlock();
 
-  ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
+  ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
+                 << keepalive_frame.timestamp() << dendl;
   connection->set_last_keepalive(ceph_clock_now());
 
   if (is_connected()) {
@@ -1428,8 +1749,8 @@ CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {
 CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  KeepAliveFrame keepalive_ack_frame(payload, length);
-  connection->set_last_keepalive_ack(utime_t(keepalive_ack_frame.timestamp));
+  KeepAliveFrame keepalive_ack_frame(this, payload, length);
+  connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
   ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
 
   return CONTINUE(read_frame);
@@ -1438,8 +1759,8 @@ CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) {
 CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  AckFrame ack(payload, length);
-  handle_message_ack(ack.seq);
+  AckFrame ack(this, payload, length);
+  handle_message_ack(ack.seq());
   return CONTINUE(read_frame);
 }
 
@@ -1449,6 +1770,8 @@ CtPtr ProtocolV2::start_client_banner_exchange() {
   ldout(cct, 20) << __func__ << dendl;
   state = CONNECTING;
 
+  global_seq = messenger->get_global_seq();
+
   return _banner_exchange(CONTINUATION(post_client_banner_exchange));
 }
 
@@ -1461,7 +1784,7 @@ CtPtr ProtocolV2::post_client_banner_exchange() {
   return send_auth_request();
 }
 
-CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> allowed_methods) {
+CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
   ldout(cct, 20) << __func__ << dendl;
 
   if (!authorizer) {
@@ -1485,7 +1808,7 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> allowed_methods) {
                    << connection->peer_type << dendl;
     AuthRequestFrame authFrame(auth_method);
     bufferlist &bl = authFrame.get_buffer();
-    return WRITE(bl, handle_auth_request_write);
+    return WRITE(bl, "auth request", read_frame);
   }
 
   auth_method = authorizer->protocol;
@@ -1505,30 +1828,25 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> allowed_methods) {
 
   AuthRequestFrame authFrame(auth_method, authorizer->bl);
   bufferlist &bl = authFrame.get_buffer();
-  return WRITE(bl, handle_auth_request_write);
-}
-
-CtPtr ProtocolV2::handle_auth_request_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " auth request write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
-  return CONTINUE(read_frame);
+  return WRITE(bl, "auth request", read_frame);
 }
 
 CtPtr ProtocolV2::handle_auth_bad_method(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
   AuthBadMethodFrame bad_method(payload, length);
-  ldout(cct, 1) << __func__ << " auth method=" << bad_method.method
-                << " rejected, allowed methods=" << bad_method.allowed_methods
+  ldout(cct, 1) << __func__ << " auth method=" << bad_method.method()
+                << " rejected, allowed methods=" << bad_method.allowed_methods()
                 << dendl;
 
-  return send_auth_request(bad_method.allowed_methods);
+  if (got_bad_method == bad_method.allowed_methods().size()) {
+    ldout(cct, 1) << __func__ << " too many attempts, closing connection"
+                  << dendl;
+    return _fault();
+  }
+  got_bad_method++;
+
+  return send_auth_request(bad_method.allowed_methods());
 }
 
 CtPtr ProtocolV2::handle_auth_bad_auth(char *payload, uint32_t length) {
@@ -1536,10 +1854,12 @@ CtPtr ProtocolV2::handle_auth_bad_auth(char *payload, uint32_t length) {
 
   AuthBadAuthFrame bad_auth(payload, length);
   ldout(cct, 1) << __func__ << " authentication failed"
-                << " error code=" << bad_auth.error_code
-                << " error message=" << bad_auth.error_msg << dendl;
+                << " error code=" << bad_auth.error_code()
+                << " error message=" << bad_auth.error_msg() << dendl;
 
   if (got_bad_auth) {
+    ldout(cct, 1) << __func__ << " too many attempts, closing connection"
+                  << dendl;
     return _fault();
   }
 
@@ -1553,7 +1873,7 @@ CtPtr ProtocolV2::handle_auth_bad_auth(char *payload, uint32_t length) {
 
   AuthRequestFrame authFrame(auth_method, authorizer->bl);
   bufferlist &bl = authFrame.get_buffer();
-  return WRITE(bl, handle_auth_request_write);
+  return WRITE(bl, "auth request", read_frame);
 }
 
 CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) {
@@ -1562,7 +1882,7 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) {
   AuthDoneFrame auth_done(payload, length);
 
   if (authorizer) {
-    auto iter = auth_done.auth_payload.cbegin();
+    auto iter = auth_done.auth_payload().cbegin();
     if (!authorizer->verify_reply(iter)) {
       ldout(cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
       return _fault();
@@ -1570,7 +1890,8 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) {
   }
 
   ldout(cct, 1) << __func__ << " authentication done,"
-                << " flags=" << auth_done.flags << dendl;
+                << " flags=" << std::hex << auth_done.flags() << std::dec
+                << dendl;
 
   if (authorizer) {
     ldout(cct, 10) << __func__ << " setting up session_security with auth "
@@ -1586,6 +1907,8 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) {
     session_security.reset();
   }
 
+  auth_flags = auth_done.flags();
+
   if (!cookie) {
     ceph_assert(connect_seq == 0);
     return send_client_ident();
@@ -1603,7 +1926,7 @@ CtPtr ProtocolV2::send_client_ident() {
     flags |= CEPH_MSG_CONNECT_LOSSY;
   }
 
-  ClientIdentFrame client_ident(messenger->get_myaddrs(),
+  ClientIdentFrame client_ident(this, messenger->get_myaddrs(),
                                 messenger->get_myname().num(), global_seq,
                                 connection->policy.features_supported,
                                 connection->policy.features_required, flags);
@@ -1618,54 +1941,30 @@ CtPtr ProtocolV2::send_client_ident() {
                 << " flags=" << flags << std::dec << dendl;
 
   bufferlist &bl = client_ident.get_buffer();
-  return WRITE(bl, handle_client_ident_write);
-}
-
-CtPtr ProtocolV2::handle_client_ident_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " client ident write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
-  return CONTINUE(read_frame);
+  return WRITE(bl, "client ident", read_frame);
 }
 
 CtPtr ProtocolV2::send_reconnect() {
   ldout(cct, 20) << __func__ << dendl;
 
-  ReconnectFrame reconnect(messenger->get_myaddrs(), cookie, global_seq,
+  ReconnectFrame reconnect(this, messenger->get_myaddrs(), cookie, global_seq,
                            connect_seq, in_seq);
 
   ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie
                 << " gs=" << global_seq << " cs=" << connect_seq
                 << " ms=" << in_seq << dendl;
   bufferlist &bl = reconnect.get_buffer();
-  return WRITE(bl, handle_reconnect_write);
-}
-
-CtPtr ProtocolV2::handle_reconnect_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " reconnect write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
-  return CONTINUE(read_frame);
+  return WRITE(bl, "reconnect", read_frame);
 }
 
 CtPtr ProtocolV2::handle_ident_missing_features(char *payload,
                                                 uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  IdentMissingFeaturesFrame ident_missing(payload, length);
+  IdentMissingFeaturesFrame ident_missing(this, payload, length);
   lderr(cct) << __func__
              << " client does not support all server features: " << std::hex
-             << ident_missing.features << std::dec << dendl;
+             << ident_missing.features() << std::dec << dendl;
 
   return _fault();
 }
@@ -1682,11 +1981,11 @@ CtPtr ProtocolV2::handle_session_reset() {
 CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  RetryFrame retry(payload, length);
-  connect_seq = retry.connect_seq + 1;
+  RetryFrame retry(this, payload, length);
+  connect_seq = retry.connect_seq() + 1;
 
   ldout(cct, 1) << __func__
-                << " received session retry connect_seq=" << retry.connect_seq
+                << " received session retry connect_seq=" << retry.connect_seq()
                 << ", inc to cs=" << connect_seq << dendl;
 
   return send_reconnect();
@@ -1695,11 +1994,11 @@ CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) {
 CtPtr ProtocolV2::handle_session_retry_global(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  RetryGlobalFrame retry(payload, length);
-  global_seq = messenger->get_global_seq(retry.global_seq);
+  RetryGlobalFrame retry(this, payload, length);
+  global_seq = messenger->get_global_seq(retry.global_seq());
 
   ldout(cct, 1) << __func__ << " received session retry global global_seq="
-                << retry.global_seq << ", choose new gs=" << global_seq
+                << retry.global_seq() << ", choose new gs=" << global_seq
                 << dendl;
 
   return send_reconnect();
@@ -1715,11 +2014,12 @@ CtPtr ProtocolV2::handle_wait() {
 CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  ReconnectOkFrame reconnect_ok(payload, length);
+  ReconnectOkFrame reconnect_ok(this, payload, length);
   ldout(cct, 5) << __func__
-                << " reconnect accepted: sms=" << reconnect_ok.msg_seq << dendl;
+                << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
+                << dendl;
 
-  out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq);
+  out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
 
   backoff = utime_t();
   ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
@@ -1739,25 +2039,26 @@ CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) {
 CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  ServerIdentFrame server_ident(payload, length);
+  ServerIdentFrame server_ident(this, payload, length);
   ldout(cct, 5) << __func__ << " received server identification: "
-                << "addrs=" << server_ident.addrs << " gid=" << server_ident.gid
-                << " global_seq=" << server_ident.global_seq
+                << "addrs=" << server_ident.addrs()
+                << " gid=" << server_ident.gid()
+                << " global_seq=" << server_ident.global_seq()
                 << " features_supported=" << std::hex
-                << server_ident.supported_features
-                << " features_required=" << server_ident.required_features
-                << " flags=" << server_ident.flags << " cookie=" << std::dec
-                << server_ident.cookie << dendl;
+                << server_ident.supported_features()
+                << " features_required=" << server_ident.required_features()
+                << " flags=" << server_ident.flags() << " cookie=" << std::dec
+                << server_ident.cookie() << dendl;
 
-  cookie = server_ident.cookie;
+  cookie = server_ident.cookie();
 
-  connection->set_peer_addrs(server_ident.addrs);
-  connection->peer_global_id = server_ident.gid;
-  connection->set_features(server_ident.supported_features &
+  connection->set_peer_addrs(server_ident.addrs());
+  connection->peer_global_id = server_ident.gid();
+  connection->set_features(server_ident.supported_features() &
                            connection->policy.features_supported);
-  peer_global_seq = server_ident.global_seq;
+  peer_global_seq = server_ident.global_seq();
 
-  connection->policy.lossy = server_ident.flags & CEPH_MSG_CONNECT_LOSSY;
+  connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
 
   backoff = utime_t();
   ldout(cct, 10) << __func__ << " connect success " << connect_seq
@@ -1792,51 +2093,24 @@ CtPtr ProtocolV2::post_server_banner_exchange() {
   return CONTINUE(read_frame);
 }
 
-CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
-  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
-
-  AuthRequestFrame auth_request(payload, length);
-
-  ldout(cct, 10) << __func__ << " AuthRequest(method=" << auth_request.method
-                 << ", auth_len=" << auth_request.len << ")" << dendl;
-
-  std::vector<uint32_t> allowed_methods;
-  messenger->ms_deliver_get_auth_allowed_methods(connection->peer_type,
-                                                 allowed_methods);
-
-  bool found = std::find(allowed_methods.begin(), allowed_methods.end(),
-                         auth_request.method) != allowed_methods.end();
-  if (!found) {
-    ldout(cct, 1) << __func__ << " auth method=" << auth_request.method
-                  << " not allowed" << dendl;
-    AuthBadMethodFrame bad_method(auth_request.method, allowed_methods);
-    bufferlist &bl = bad_method.get_buffer();
-    return WRITE(bl, handle_auth_bad_method_write);
-  }
-
-  ldout(cct, 10) << __func__ << " auth method=" << auth_request.method
-                 << " accepted" << dendl;
-
-  auth_method = auth_request.method;
+CtPtr ProtocolV2::handle_cephx_auth(bufferlist &auth_payload) {
+  ldout(cct, 20) << __func__ << dendl;
 
-  if (auth_request.method == CEPH_AUTH_NONE) {
-    ldout(cct, 1) << __func__ << " proceeding without authentication" << dendl;
+  ceph_assert(auth_method == CEPH_AUTH_CEPHX);
 
-    session_security.reset();
-    bufferlist empty_bl;
-    AuthDoneFrame auth_done(0, empty_bl);
-    bufferlist &bl = auth_done.get_buffer();
-    return WRITE(bl, handle_auth_done_write);
-  }
+  ldout(cct, 15) << __func__
+                 << " authorizer payload len=" << auth_payload.length()
+                 << dendl;
 
-  connection->lock.unlock();
   bool authorizer_valid;
   bufferlist authorizer_reply;
   bool had_challenge = (bool)authorizer_challenge;
+
+  connection->lock.unlock();
   if (!messenger->ms_deliver_verify_authorizer(
-          connection, connection->peer_type, auth_request.method,
-          auth_request.auth_payload, authorizer_reply, authorizer_valid,
-          session_key, &authorizer_challenge) ||
+          connection, connection->peer_type, auth_method, auth_payload,
+          authorizer_reply, authorizer_valid, session_key,
+          &authorizer_challenge) ||
       !authorizer_valid) {
     connection->lock.lock();
 
@@ -1844,14 +2118,13 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
       ldout(cct, 10) << __func__ << " challenging authorizer" << dendl;
       ceph_assert(authorizer_reply.length());
       AuthMoreFrame more(authorizer_reply);
-      return WRITE(more.get_buffer(), handle_auth_more_write);
+      return WRITE(more.get_buffer(), "auth more", read_frame);
     } else {
       ldout(cct, 0) << __func__ << " got bad authorizer, auth_reply_len="
                     << authorizer_reply.length() << dendl;
       session_security.reset();
       AuthBadAuthFrame bad_auth(EPERM, "Bad Authorizer");
-      bufferlist &bl = bad_auth.get_buffer();
-      return WRITE(bl, handle_auth_bad_auth_write);
+      return WRITE(bad_auth.get_buffer(), "bad auth", read_frame);
     }
   }
 
@@ -1861,85 +2134,106 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
       get_auth_session_handler(cct, auth_method, session_key,
                                CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2));
 
-  AuthDoneFrame auth_done(0, authorizer_reply);
-  bufferlist &bl = auth_done.get_buffer();
-  return WRITE(bl, handle_auth_done_write);
+  if (cct->_conf.get_val<bool>("ms_msgr2_sign_messages")) {
+    auth_flags |= static_cast<uint64_t>(AuthFlag::SIGNED);
+  }
+  if (cct->_conf.get_val<bool>("ms_msgr2_encrypt_messages")) {
+    auth_flags |= static_cast<uint64_t>(AuthFlag::ENCRYPTED);
+  }
+
+  ldout(cct, 1) << __func__ << " authentication done,"
+                << " flags=" << std::hex << auth_flags << std::dec << dendl;
+
+  AuthDoneFrame auth_done(auth_flags, authorizer_reply);
+  return WRITE(auth_done.get_buffer(), "auth done", read_frame);
 }
 
-CtPtr ProtocolV2::handle_auth_bad_method_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " auth bad method write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
+  AuthRequestFrame auth_request(payload, length);
 
-  return CONTINUE(read_frame);
-}
+  ldout(cct, 10) << __func__ << " AuthRequest(method=" << auth_request.method()
+                 << ", auth_len=" << auth_request.auth_payload().length() << ")"
+                 << dendl;
 
-CtPtr ProtocolV2::handle_auth_bad_auth_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+  std::vector<uint32_t> allowed_methods;
+  messenger->ms_deliver_get_auth_allowed_methods(connection->peer_type,
+                                                 allowed_methods);
 
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " auth bad auth write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
+  bool found = std::find(allowed_methods.begin(), allowed_methods.end(),
+                         auth_request.method()) != allowed_methods.end();
+  if (!found) {
+    ldout(cct, 1) << __func__ << " auth method=" << auth_request.method()
+                  << " not allowed" << dendl;
+    AuthBadMethodFrame bad_method(auth_request.method(), allowed_methods);
+    bufferlist &bl = bad_method.get_buffer();
+    return WRITE(bl, "bad auth method", read_frame);
   }
 
-  return CONTINUE(read_frame);
-}
+  ldout(cct, 10) << __func__ << " auth method=" << auth_request.method()
+                 << " accepted" << dendl;
 
-CtPtr ProtocolV2::handle_auth_done_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+  auth_method = auth_request.method();
 
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " auth done write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
+  if (auth_method == CEPH_AUTH_NONE) {
+    ldout(cct, 1) << __func__ << " proceeding without authentication" << dendl;
+
+    session_security.reset();
+    bufferlist empty_bl;
+    AuthDoneFrame auth_done(0, empty_bl);
+    return WRITE(auth_done.get_buffer(), "auth done", read_frame);
   }
 
-  return CONTINUE(read_frame);
+  if (auth_method == CEPH_AUTH_CEPHX) {
+    return handle_cephx_auth(auth_request.auth_payload());
+  }
+
+  lderr(cct) << __func__ << " auth method " << auth_method << " not implemented"
+             << dendl;
+  ceph_abort();
+  return nullptr;
 }
 
 CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
 
-  ClientIdentFrame client_ident(payload, length);
+  ClientIdentFrame client_ident(this, payload, length);
 
   ldout(cct, 5) << __func__ << " received client identification: "
-                << "addrs=" << client_ident.addrs << " gid=" << client_ident.gid
-                << " global_seq=" << client_ident.global_seq
+                << "addrs=" << client_ident.addrs()
+                << " gid=" << client_ident.gid()
+                << " global_seq=" << client_ident.global_seq()
                 << " features_supported=" << std::hex
-                << client_ident.supported_features
-                << " features_required=" << client_ident.required_features
-                << " flags=" << client_ident.flags << std::dec << dendl;
+                << client_ident.supported_features()
+                << " features_required=" << client_ident.required_features()
+                << " flags=" << client_ident.flags() << std::dec << dendl;
 
-  if (client_ident.addrs.empty()) {
+  if (client_ident.addrs().empty()) {
     connection->set_peer_addr(connection->target_addr);
   } else {
     // Should we check if one of the ident.addrs match connection->target_addr
     // as we do in ProtocolV1?
-    connection->set_peer_addrs(client_ident.addrs);
-    connection->target_addr = client_ident.addrs.msgr2_addr();
+    connection->set_peer_addrs(client_ident.addrs());
+    connection->target_addr = client_ident.addrs().msgr2_addr();
   }
 
   uint64_t feat_missing = connection->policy.features_required &
-                          ~(uint64_t)client_ident.supported_features;
+                          ~(uint64_t)client_ident.supported_features();
   if (feat_missing) {
     ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
                   << feat_missing << std::dec << dendl;
-    IdentMissingFeaturesFrame ident_missing_features(feat_missing);
+    IdentMissingFeaturesFrame ident_missing_features(this, feat_missing);
 
     bufferlist &bl = ident_missing_features.get_buffer();
-    return WRITE(bl, handle_ident_missing_features_write);
+    return WRITE(bl, "ident missing features", read_frame);
   }
 
   connection_features =
-      client_ident.supported_features & connection->policy.features_supported;
+      client_ident.supported_features() & connection->policy.features_supported;
 
   state = ACCEPTING_SESSION;
-  peer_global_seq = client_ident.global_seq;
+  peer_global_seq = client_ident.global_seq();
 
   // Looks good so far, let's check if there is already an existing connection
   // to this peer.
@@ -1966,36 +2260,24 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
   return send_server_ident();
 }
 
-CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r
-                  << " (" << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
-  return CONTINUE(read_frame);
-}
-
 CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
 
-  ReconnectFrame reconnect(payload, length);
+  ReconnectFrame reconnect(this, payload, length);
 
   ldout(cct, 5) << __func__
-                << " received reconnect: cookie=" << reconnect.cookie
-                << " gs=" << reconnect.global_seq
-                << " cs=" << reconnect.connect_seq
-                << " ms=" << reconnect.msg_seq << dendl;
+                << " received reconnect: cookie=" << reconnect.cookie()
+                << " gs=" << reconnect.global_seq()
+                << " cs=" << reconnect.connect_seq()
+                << " ms=" << reconnect.msg_seq() << dendl;
 
-  if (reconnect.addrs.empty()) {
+  if (reconnect.addrs().empty()) {
     connection->set_peer_addr(connection->target_addr);
   } else {
     // Should we check if one of the ident.addrs match connection->target_addr
     // as we do in ProtocolV1?
-    connection->set_peer_addrs(reconnect.addrs);
-    connection->target_addr = reconnect.addrs.msgr2_addr();
+    connection->set_peer_addrs(reconnect.addrs());
+    connection->target_addr = reconnect.addrs().msgr2_addr();
   }
 
   connection->lock.unlock();
@@ -2020,7 +2302,7 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
     // session
     ldout(cct, 0) << __func__
                   << " no existing connection exists, reseting client" << dendl;
-    return WRITE(bl, handle_session_reset_write);
+    return WRITE(bl, "session reset", read_frame);
   }
 
   std::lock_guard<std::mutex> l(existing->lock);
@@ -2034,79 +2316,56 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
   if (exproto->state == CLOSED) {
     ldout(cct, 5) << __func__ << " existing " << existing
                   << " already closed. Reseting client" << dendl;
-    return WRITE(bl, handle_session_reset_write);
+    return WRITE(bl, "session reset", read_frame);
   }
 
   if (exproto->replacing) {
     ldout(cct, 1) << __func__
                   << " existing racing replace happened while replacing."
                   << " existing=" << existing << dendl;
-    RetryGlobalFrame retry(exproto->peer_global_seq);
+    RetryGlobalFrame retry(this, exproto->peer_global_seq);
     bufferlist &bl = retry.get_buffer();
-    return WRITE(bl, handle_session_retry_write);
+    return WRITE(bl, "session retry", read_frame);
   }
 
   if (!exproto->cookie) {
     // server connection was reseted, reset client
     ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl;
-    return WRITE(bl, handle_session_reset_write);
-  } else if (exproto->cookie != reconnect.cookie) {
+    return WRITE(bl, "session reset", read_frame);
+  } else if (exproto->cookie != reconnect.cookie()) {
     ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie
-                  << " cc=" << reconnect.cookie << ", reseting client" << dendl;
-    return WRITE(bl, handle_session_reset_write);
+                  << " cc=" << reconnect.cookie() << ", reseting client"
+                  << dendl;
+    return WRITE(bl, "session reset", read_frame);
   }
 
-  if (exproto->peer_global_seq > reconnect.global_seq) {
+  if (exproto->peer_global_seq > reconnect.global_seq()) {
     ldout(cct, 5) << __func__
                   << " stale global_seq: sgs=" << exproto->peer_global_seq
-                  << " cgs=" << reconnect.global_seq
+                  << " cgs=" << reconnect.global_seq()
                   << ", ask client to retry global" << dendl;
-    RetryGlobalFrame retry(exproto->peer_global_seq);
+    RetryGlobalFrame retry(this, exproto->peer_global_seq);
     bufferlist &bl = retry.get_buffer();
-    return WRITE(bl, handle_session_retry_write);
+    return WRITE(bl, "session retry", read_frame);
   }
 
-  if (exproto->connect_seq >= reconnect.connect_seq) {
+  if (exproto->connect_seq >= reconnect.connect_seq()) {
     ldout(cct, 5) << __func__
                   << " stale connect_seq scs=" << exproto->connect_seq
-                  << " ccs=" << reconnect.connect_seq
+                  << " ccs=" << reconnect.connect_seq()
                   << " , ask client to retry" << dendl;
-    RetryFrame retry(exproto->connect_seq);
+    RetryFrame retry(this, exproto->connect_seq);
     bufferlist &bl = retry.get_buffer();
-    return WRITE(bl, handle_session_retry_write);
+    return WRITE(bl, "session retry", read_frame);
   }
 
   // everything looks good
-  exproto->connect_seq = reconnect.connect_seq;
-  exproto->message_seq = reconnect.msg_seq;
+  exproto->connect_seq = reconnect.connect_seq();
+  exproto->message_seq = reconnect.msg_seq();
 
   return reuse_connection(existing, exproto, true);
 }
 
-CtPtr ProtocolV2::handle_session_reset_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " session reset write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
-  return CONTINUE(read_frame);
-}
-
-CtPtr ProtocolV2::handle_session_retry_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " session retry write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
-  }
-
-  return CONTINUE(read_frame);
-}
-
 CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
   ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
 
@@ -2130,7 +2389,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
                   << " existing=" << existing << dendl;
     WaitFrame wait;
     bufferlist &bl = wait.get_buffer();
-    return WRITE(bl, handle_wait_write);
+    return WRITE(bl, "wait", read_frame);
   }
 
   if (existing->policy.lossy) {
@@ -2180,20 +2439,8 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
                 messenger->get_myaddrs().msgr2_addr());
     WaitFrame wait;
     bufferlist &bl = wait.get_buffer();
-    return WRITE(bl, handle_wait_write);
-  }
-}
-
-CtPtr ProtocolV2::handle_wait_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " wait write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    return _fault();
+    return WRITE(bl, "wait", read_frame);
   }
-
-  return CONTINUE(read_frame);
 }
 
 CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
@@ -2230,6 +2477,10 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
                            << dendl;
   exproto->can_write = false;
   exproto->replacing = true;
+  exproto->session_security = session_security;
+  exproto->auth_method = auth_method;
+  exproto->session_key = session_key;
+  exproto->authorizer_challenge = std::move(authorizer_challenge);
   existing->state_offset = 0;
   // avoid previous thread modify event
   exproto->state = NONE;
@@ -2285,9 +2536,9 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
           existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
                                               existing->read_handler);
           if (!reconnect) {
-            CONTINUATION_RUN2(exproto, exproto->send_server_ident())
+            exproto->run_continuation(exproto->send_server_ident());
           } else {
-            CONTINUATION_RUN2(exproto, exproto->send_reconnect_ok())
+            exproto->run_continuation(exproto->send_reconnect_ok());
           }
         };
         if (existing->center->in_thread())
@@ -2320,7 +2571,7 @@ CtPtr ProtocolV2::send_server_ident() {
   }
 
   uint64_t gs = messenger->get_global_seq();
-  ServerIdentFrame server_ident(
+  ServerIdentFrame server_ident(this,
       messenger->get_myaddrs(), messenger->get_myname().num(), gs,
       connection->policy.features_supported,
       connection->policy.features_required, flags, cookie);
@@ -2367,18 +2618,11 @@ CtPtr ProtocolV2::send_server_ident() {
   messenger->ms_deliver_handle_fast_accept(connection);
 
   bufferlist &bl = server_ident.get_buffer();
-  return WRITE(bl, handle_server_ident_write);
+  return WRITE(bl, "server ident", server_ready);
 }
 
-CtPtr ProtocolV2::handle_server_ident_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    connection->inject_delay();
-    return _fault();
-  }
+CtPtr ProtocolV2::server_ready() {
+  ldout(cct, 20) << __func__ << dendl;
 
   if (connection->delay_state) {
     ceph_assert(connection->delay_state->ready());
@@ -2393,7 +2637,7 @@ CtPtr ProtocolV2::send_reconnect_ok() {
   out_seq = discard_requeued_up_to(out_seq, message_seq);
 
   uint64_t ms = in_seq;
-  ReconnectOkFrame reconnect_ok(ms);
+  ReconnectOkFrame reconnect_ok(this, ms);
 
   ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
 
@@ -2428,22 +2672,5 @@ CtPtr ProtocolV2::send_reconnect_ok() {
   messenger->ms_deliver_handle_fast_accept(connection);
 
   bufferlist &bl = reconnect_ok.get_buffer();
-  return WRITE(bl, handle_reconnect_ok_write);
-}
-
-CtPtr ProtocolV2::handle_reconnect_ok_write(int r) {
-  ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " reconnect ok write failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
-    connection->inject_delay();
-    return _fault();
-  }
-
-  if (connection->delay_state) {
-    ceph_assert(connection->delay_state->ready());
-  }
-
-  return ready();
+  return WRITE(bl, "reconnect ok", server_ready);
 }
index be44f379bf447c643d2351d0b29e19c6fa87cab9..e46323645f881124e14a3ed9983e310ca22a3234 100644 (file)
@@ -45,8 +45,9 @@ private:
     return statenames[state];
   }
 
+public:
   enum class Tag : uint32_t {
-    AUTH_REQUEST,
+    AUTH_REQUEST = 1,
     AUTH_BAD_METHOD,
     AUTH_BAD_AUTH,
     AUTH_MORE,
@@ -65,384 +66,8 @@ private:
     ACK
   };
 
-  struct Frame {
-    uint32_t tag;
-    bufferlist payload;
-    bufferlist frame_buffer;
-
-    Frame(Tag tag) : tag(static_cast<uint32_t>(tag)) {
-      encode(this->tag, payload, 0);
-    }
-
-    Frame() {}
-
-    bufferlist &get_buffer() {
-      if (frame_buffer.length()) {
-        return frame_buffer;
-      }
-      encode((uint32_t)payload.length(), frame_buffer, 0);
-      frame_buffer.claim_append(payload);
-      return frame_buffer;
-    }
-  };
-
-  struct SignedEncryptedFrame : public Frame {
-    SignedEncryptedFrame(Tag tag) : Frame(tag) {}
-    SignedEncryptedFrame() : Frame() {}
-    bufferlist &get_buffer() { return Frame::get_buffer(); }
-  };
-
-  struct AuthRequestFrame : public Frame {
-    uint32_t method;
-    uint32_t len;
-    bufferlist auth_payload;
-
-    AuthRequestFrame(uint32_t method, bufferlist &auth_payload)
-        : Frame(Tag::AUTH_REQUEST) {
-      encode(method, payload, 0);
-      encode(auth_payload.length(), payload, 0);
-      payload.claim_append(auth_payload);
-    }
-
-    AuthRequestFrame(uint32_t method) : Frame(Tag::AUTH_REQUEST) {
-      encode(method, payload, 0);
-      encode((uint32_t)0, payload, 0);
-    }
-
-    AuthRequestFrame(char *payload, uint32_t length) : Frame() {
-      method = *(uint32_t *)payload;
-      len = *(uint32_t *)(payload + sizeof(uint32_t));
-      ceph_assert((length - (sizeof(uint32_t) * 2)) == len);
-      auth_payload.append((payload + (sizeof(uint32_t) * 2)), len);
-    }
-  };
-
-  struct AuthBadMethodFrame : public Frame {
-    uint32_t method;
-    std::vector<uint32_t> allowed_methods;
-
-    AuthBadMethodFrame(uint32_t method, std::vector<uint32_t> methods)
-        : Frame(Tag::AUTH_BAD_METHOD) {
-      encode(method, payload, 0);
-      encode((uint32_t)methods.size(), payload, 0);
-      for (const auto &a_meth : methods) {
-        encode(a_meth, payload, 0);
-      }
-    }
-
-    AuthBadMethodFrame(char *payload, uint32_t length) : Frame() {
-      method = *(uint32_t *)payload;
-      uint32_t num_methods = *(uint32_t *)(payload + sizeof(uint32_t));
-      payload += sizeof(uint32_t) * 2;
-      for (unsigned i = 0; i < num_methods; ++i) {
-        allowed_methods.push_back(
-            *(uint32_t *)(payload + sizeof(uint32_t) * i));
-      }
-    }
-  };
-
-  struct AuthBadAuthFrame : public Frame {
-    uint32_t error_code;
-    std::string error_msg;
-
-    AuthBadAuthFrame(uint32_t error_code, std::string error_msg)
-        : Frame(Tag::AUTH_BAD_AUTH) {
-      encode(error_code, payload, 0);
-      encode(error_msg, payload, 0);
-    }
-
-    AuthBadAuthFrame(char *payload, uint32_t length) : Frame() {
-      error_code = *(uint32_t *)payload;
-      uint32_t len = *(uint32_t *)(payload + sizeof(uint32_t));
-      error_msg = std::string(payload + sizeof(uint32_t) * 2, len);
-    }
-  };
-
-  struct AuthMoreFrame : public Frame {
-    bufferlist auth_payload;
-
-    AuthMoreFrame(bufferlist &auth_payload) : Frame(Tag::AUTH_MORE) {
-      encode(auth_payload.length(), payload, 0);
-      payload.claim_append(auth_payload);
-    }
-
-    AuthMoreFrame(char *payload, uint32_t length) : Frame() {
-      uint32_t len = *(uint32_t *)payload;
-      ceph_assert((length - sizeof(uint32_t)) == len);
-      auth_payload.append(payload + sizeof(uint32_t), len);
-    }
-  };
-
-  struct AuthDoneFrame : public Frame {
-    uint64_t flags;
-    bufferlist auth_payload;
-
-    AuthDoneFrame(uint64_t flags, bufferlist &auth_payload)
-        : Frame(Tag::AUTH_DONE) {
-      encode(flags, payload, 0);
-      encode(auth_payload.length(), payload, 0);
-      payload.claim_append(auth_payload);
-    }
-
-    AuthDoneFrame(char *payload, uint32_t length) : Frame() {
-      flags = *(uint64_t *)payload;
-      payload += sizeof(uint64_t);
-      uint32_t len = *(uint32_t *)payload;
-      ceph_assert((length - sizeof(uint32_t) - sizeof(uint64_t)) == len);
-      auth_payload.append(payload + sizeof(uint32_t), len);
-    }
-  };
-
-  struct ClientIdentFrame : public SignedEncryptedFrame {
-    entity_addrvec_t addrs;
-    int64_t gid;
-    uint64_t global_seq;
-    uint64_t supported_features;  // CEPH_FEATURE_*
-    uint64_t required_features;   // CEPH_FEATURE_*
-    uint64_t flags;               // CEPH_MSG_CONNECT_*
-
-    ClientIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
-                     uint64_t global_seq, uint64_t supported_features,
-                     uint64_t required_features, uint64_t flags)
-        : SignedEncryptedFrame(Tag::IDENT) {
-      encode(addrs, payload, -1ll);
-      encode(gid, payload, -1ll);
-      encode(global_seq, payload, -1ll);
-      encode(supported_features, payload, -1ll);
-      encode(required_features, payload, -1ll);
-      encode(flags, payload, -1ll);
-    }
-
-    ClientIdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
-      bufferlist bl;
-      bl.push_back(buffer::create_static(length, payload));
-      try {
-        auto ti = bl.cbegin();
-        decode_frame(ti);
-      } catch (const buffer::error &e) {
-      }
-    }
-
-    ClientIdentFrame() : SignedEncryptedFrame() {}
-
-  protected:
-    void decode_frame(ceph::buffer::list::const_iterator &ti) {
-      decode(addrs, ti);
-      decode(gid, ti);
-      decode(global_seq, ti);
-      decode(supported_features, ti);
-      decode(required_features, ti);
-      decode(flags, ti);
-    }
-  };
-
-  struct ServerIdentFrame : public ClientIdentFrame {
-    uint64_t cookie;
-
-    ServerIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
-                     uint64_t global_seq, uint64_t supported_features,
-                     uint64_t required_features, uint64_t flags,
-                     uint64_t cookie)
-        : ClientIdentFrame(addrs, gid, global_seq, supported_features,
-                           required_features, flags) {
-      encode(cookie, payload, -1ll);
-    }
-
-    ServerIdentFrame(char *payload, uint32_t length) : ClientIdentFrame() {
-      bufferlist bl;
-      bl.push_back(buffer::create_static(length, payload));
-      try {
-        auto ti = bl.cbegin();
-        ClientIdentFrame::decode_frame(ti);
-        decode(cookie, ti);
-      } catch (const buffer::error &e) {
-      }
-    }
-  };
-
-  struct ReconnectFrame : public SignedEncryptedFrame {
-    entity_addrvec_t addrs;
-    uint64_t cookie;
-    uint64_t global_seq;
-    uint64_t connect_seq;
-    uint64_t msg_seq;
-
-    ReconnectFrame(const entity_addrvec_t &addrs, uint64_t cookie,
-                   uint64_t global_seq, uint64_t connect_seq, uint64_t msg_seq)
-        : SignedEncryptedFrame(Tag::SESSION_RECONNECT) {
-      encode(addrs, payload, -1ll);
-      encode(cookie, payload, 0);
-      encode(global_seq, payload, 0);
-      encode(connect_seq, payload, 0);
-      encode(msg_seq, payload, 0);
-    }
-
-    ReconnectFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
-      bufferlist bl;
-      bl.push_back(buffer::create_static(length, payload));
-      try {
-        auto ti = bl.cbegin();
-        decode(addrs, ti);
-        decode(cookie, ti);
-        decode(global_seq, ti);
-        decode(connect_seq, ti);
-        decode(msg_seq, ti);
-      } catch (const buffer::error &e) {
-      }
-    }
-  };
-
-  struct ResetFrame : public SignedEncryptedFrame {
-    ResetFrame() : SignedEncryptedFrame(Tag::SESSION_RESET) {}
-  };
-
-  struct RetryFrame : public SignedEncryptedFrame {
-    uint64_t connect_seq;
-
-    RetryFrame(uint64_t connect_seq)
-        : SignedEncryptedFrame(Tag::SESSION_RETRY) {
-      encode(connect_seq, payload);
-    }
-
-    RetryFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
-      bufferlist bl;
-      bl.push_back(buffer::create_static(length, payload));
-      try {
-        auto ti = bl.cbegin();
-        decode(connect_seq, ti);
-      } catch (const buffer::error &e) {
-      }
-    }
-  };
-
-  struct RetryGlobalFrame : public SignedEncryptedFrame {
-    uint64_t global_seq;
-
-    RetryGlobalFrame(uint64_t global_seq)
-        : SignedEncryptedFrame(Tag::SESSION_RETRY_GLOBAL) {
-      encode(global_seq, payload);
-    }
-
-    RetryGlobalFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
-      bufferlist bl;
-      bl.push_back(buffer::create_static(length, payload));
-      try {
-        auto ti = bl.cbegin();
-        decode(global_seq, ti);
-      } catch (const buffer::error &e) {
-      }
-    }
-  };
-
-  struct WaitFrame : public SignedEncryptedFrame {
-    WaitFrame() : SignedEncryptedFrame(Tag::WAIT) {}
-  };
-
-  struct ReconnectOkFrame : public SignedEncryptedFrame {
-    uint64_t msg_seq;
-
-    ReconnectOkFrame(uint64_t msg_seq)
-        : SignedEncryptedFrame(Tag::SESSION_RECONNECT_OK) {
-      encode(msg_seq, payload, 0);
-    }
-
-    ReconnectOkFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
-      bufferlist bl;
-      bl.push_back(buffer::create_static(length, payload));
-      try {
-        auto ti = bl.cbegin();
-        decode(msg_seq, ti);
-      } catch (const buffer::error &e) {
-      }
-    }
-  };
-
-  struct IdentMissingFeaturesFrame : public SignedEncryptedFrame {
-    __le64 features;
-
-    IdentMissingFeaturesFrame(uint64_t features)
-        : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES),
-          features(features) {
-      encode(features, payload, -1ll);
-    }
-
-    IdentMissingFeaturesFrame(char *payload, uint32_t length)
-        : SignedEncryptedFrame() {
-      features = *(uint64_t *)payload;
-    }
-  };
-
-  struct MessageFrame : public SignedEncryptedFrame {
-    const unsigned int ASYNC_COALESCE_THRESHOLD = 256;
-
-    ceph_msg_header2 header2;
-
-    MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq,
-                 bool calc_crc)
-        : SignedEncryptedFrame(Tag::MESSAGE) {
-      ceph_msg_header &header = msg->get_header();
-      ceph_msg_footer &footer = msg->get_footer();
-
-      header2 = ceph_msg_header2{header.seq,        header.tid,
-                                 header.type,       header.priority,
-                                 header.version,    header.front_len,
-                                 header.middle_len, 0,
-                                 header.data_len,   header.data_off,
-                                 ack_seq,           footer.front_crc,
-                                 footer.middle_crc, footer.data_crc,
-                                 footer.flags,      header.compat_version,
-                                 header.reserved,   0};
-
-      if (calc_crc) {
-        header2.header_crc =
-            ceph_crc32c(0, (unsigned char *)&header2,
-                        sizeof(header2) - sizeof(header2.header_crc));
-      }
-
-      payload.append((char *)&header2, sizeof(header2));
-      if ((data.length() <= ASYNC_COALESCE_THRESHOLD) &&
-          (data.buffers().size() > 1)) {
-        for (const auto &pb : data.buffers()) {
-          payload.append((char *)pb.c_str(), pb.length());
-        }
-      } else {
-        payload.claim_append(data);
-      }
-    }
-  };
-
-  struct KeepAliveFrame : public SignedEncryptedFrame {
-    struct ceph_timespec timestamp;
-
-    KeepAliveFrame() : SignedEncryptedFrame(Tag::KEEPALIVE2) {
-      struct ceph_timespec ts;
-      utime_t t = ceph_clock_now();
-      t.encode_timeval(&ts);
-      payload.append((char *)&ts, sizeof(ts));
-    }
-
-    KeepAliveFrame(struct ceph_timespec &timestamp)
-        : SignedEncryptedFrame(Tag::KEEPALIVE2_ACK) {
-      payload.append((char *)&timestamp, sizeof(timestamp));
-    }
-
-    KeepAliveFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
-      ceph_assert(length == sizeof(timestamp));
-      timestamp = *(struct ceph_timespec *)payload;
-    }
-  };
-
-  struct AckFrame : public SignedEncryptedFrame {
-    uint64_t seq;
-
-    AckFrame(uint64_t seq) : SignedEncryptedFrame(Tag::ACK) {
-      encode(seq, payload, 0);
-    }
-
-    AckFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
-      seq = *(uint64_t *)payload;
-    }
-  };
+private:
+  enum class AuthFlag : uint64_t { ENCRYPTED = 1, SIGNED = 2 };
 
   char *temp_buffer;
   State state;
@@ -450,9 +75,11 @@ private:
   AuthAuthorizer *authorizer;
   uint32_t auth_method;
   bool got_bad_auth;
+  uint32_t got_bad_method;
   CryptoKey session_key;
   std::shared_ptr<AuthSessionHandler> session_security;
   std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge;
+  uint64_t auth_flags;
   uint64_t connection_features;
   uint64_t cookie;
   uint64_t global_seq;
@@ -470,7 +97,7 @@ private:
   using ProtFuncPtr = void (ProtocolV2::*)();
   Ct<ProtocolV2> *bannerExchangeCallback;
 
-  uint32_t next_frame_len;
+  uint32_t next_payload_len;
   Tag next_tag;
   ceph_msg_header2 current_header;
   utime_t backoff;  // backoff time
@@ -484,11 +111,13 @@ private:
   bool keepalive;
 
   ostream &_conn_prefix(std::ostream *_dout);
+  inline void run_continuation(Ct<ProtocolV2> *continuation);
 
   Ct<ProtocolV2> *read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
                        int len, char *buffer = nullptr);
-  Ct<ProtocolV2> *write(CONTINUATION_PARAM(next, ProtocolV2, int),
-                        bufferlist &bl);
+  Ct<ProtocolV2> *write(const std::string &desc,
+                        CONTINUATION_PARAM(next, ProtocolV2),
+                        bufferlist &buffer);
 
   void requeue_sent();
   uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
@@ -503,18 +132,17 @@ private:
   void append_keepalive_ack(utime_t &timestamp);
   void handle_message_ack(uint64_t seq);
 
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_write);
+  CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2,
                                  _banner_exchange_handle_peer_banner);
 
   Ct<ProtocolV2> *_banner_exchange(Ct<ProtocolV2> *callback);
-  Ct<ProtocolV2> *_banner_exchange_handle_write(int r);
+  Ct<ProtocolV2> *_wait_for_peer_banner();
   Ct<ProtocolV2> *_banner_exchange_handle_peer_banner(char *buffer, int r);
 
   CONTINUATION_DECL(ProtocolV2, read_frame);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_more_write);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header);
   CONTINUATION_DECL(ProtocolV2, throttle_message);
   CONTINUATION_DECL(ProtocolV2, throttle_bytes);
@@ -527,10 +155,7 @@ private:
   Ct<ProtocolV2> *read_frame();
   Ct<ProtocolV2> *handle_read_frame_length_and_tag(char *buffer, int r);
   Ct<ProtocolV2> *handle_frame_payload(char *buffer, int r);
-
   Ct<ProtocolV2> *handle_auth_more(char *payload, uint32_t length);
-  Ct<ProtocolV2> *handle_auth_more_write(int r);
-
   Ct<ProtocolV2> *handle_ident(char *payload, uint32_t length);
 
   Ct<ProtocolV2> *ready();
@@ -574,21 +199,19 @@ private:
   // Client Protocol
   CONTINUATION_DECL(ProtocolV2, start_client_banner_exchange);
   CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_request_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_client_ident_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_write);
 
   Ct<ProtocolV2> *start_client_banner_exchange();
   Ct<ProtocolV2> *post_client_banner_exchange();
-  Ct<ProtocolV2> *send_auth_request(std::vector<uint32_t> allowed_methods = {});
-  Ct<ProtocolV2> *handle_auth_request_write(int r);
+  inline Ct<ProtocolV2> *send_auth_request() {
+    std::vector<uint32_t> empty;
+    return send_auth_request(empty);
+  }
+  Ct<ProtocolV2> *send_auth_request(std::vector<uint32_t> &allowed_methods);
   Ct<ProtocolV2> *handle_auth_bad_method(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_auth_bad_auth(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_auth_done(char *payload, uint32_t length);
   Ct<ProtocolV2> *send_client_ident();
-  Ct<ProtocolV2> *handle_client_ident_write(int r);
   Ct<ProtocolV2> *send_reconnect();
-  Ct<ProtocolV2> *handle_reconnect_write(int r);
   Ct<ProtocolV2> *handle_ident_missing_features(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_session_reset();
   Ct<ProtocolV2> *handle_session_retry(char *payload, uint32_t length);
@@ -600,36 +223,28 @@ private:
   // Server Protocol
   CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
   CONTINUATION_DECL(ProtocolV2, post_server_banner_exchange);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_method_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_auth_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_done_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2,
-                                  handle_ident_missing_features_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_reset_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_retry_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_wait_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_server_ident_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_ok_write);
+  CONTINUATION_DECL(ProtocolV2, server_ready);
 
   Ct<ProtocolV2> *start_server_banner_exchange();
   Ct<ProtocolV2> *post_server_banner_exchange();
+  Ct<ProtocolV2> *handle_cephx_auth(bufferlist &auth_payload);
   Ct<ProtocolV2> *handle_auth_request(char *payload, uint32_t length);
-  Ct<ProtocolV2> *handle_auth_bad_method_write(int r);
-  Ct<ProtocolV2> *handle_auth_bad_auth_write(int r);
-  Ct<ProtocolV2> *handle_auth_done_write(int r);
   Ct<ProtocolV2> *handle_client_ident(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
   Ct<ProtocolV2> *handle_reconnect(char *payload, uint32_t length);
-  Ct<ProtocolV2> *handle_session_reset_write(int r);
-  Ct<ProtocolV2> *handle_session_retry_write(int r);
   Ct<ProtocolV2> *handle_existing_connection(AsyncConnectionRef existing);
-  Ct<ProtocolV2> *handle_wait_write(int r);
   Ct<ProtocolV2> *reuse_connection(AsyncConnectionRef existing,
                                    ProtocolV2 *exproto, bool reconnect);
   Ct<ProtocolV2> *send_server_ident();
-  Ct<ProtocolV2> *handle_server_ident_write(int r);
   Ct<ProtocolV2> *send_reconnect_ok();
-  Ct<ProtocolV2> *handle_reconnect_ok_write(int r);
+  Ct<ProtocolV2> *server_ready();
+
+public:
+  template <class T, typename... Args>
+  friend struct SignedEncryptedFrame;
+
+  // TODO: REMOVE THIS
+  void log(const std::string message, uint64_t val, uint64_t val2);
 };
 
 #endif /* _MSG_ASYNC_PROTOCOL_V2_ */