]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: msgr2: messange exchange phase
authorRicardo Dias <rdias@suse.com>
Wed, 31 Oct 2018 16:30:32 +0000 (16:30 +0000)
committerRicardo Dias <rdias@suse.com>
Wed, 23 Jan 2019 13:59:24 +0000 (13:59 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/include/msgr.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 0c342fc1e0ac47e86645909973b84e33105659ac..687f4a82b5eda0440d0ce758478d60695aa8ba8b 100644 (file)
@@ -193,6 +193,30 @@ struct ceph_msg_header {
        __le32 crc;       /* header crc32c */
 } __attribute__ ((packed));
 
+struct ceph_msg_header2 {
+       __le64 seq;       /* message seq# for this session */
+       __le64 tid;       /* transaction id */
+       __le16 type;      /* message type */
+       __le16 priority;  /* priority.  higher value == higher priority */
+       __le16 version;   /* version of message encoding */
+
+       __le32 front_len; /* bytes in main payload */
+       __le32 middle_len;/* bytes in middle payload */
+       __le32 data_pre_padding_len;
+       __le32 data_len;  /* bytes of data payload */
+       __le16 data_off;  /* sender: include full offset;
+                            receiver: mask against ~PAGE_MASK */
+
+       __le64 ack_seq;
+
+       __le32 front_crc, middle_crc, data_crc;
+       __u8 flags;
+       /* oldest code we think can decode this.  unknown if zero. */
+       __le16 compat_version;
+       __le16 reserved;
+       __le32 header_crc;
+} __attribute__ ((packed));
+
 #define CEPH_MSG_PRIO_LOW     64
 #define CEPH_MSG_PRIO_DEFAULT 127
 #define CEPH_MSG_PRIO_HIGH    196
index d526210eb2d2497f8345ba8c756b9b9f58fbb787..8cdd9658e49a06ed9184dc0d020c83c192f314cb 100644 (file)
@@ -4,6 +4,7 @@
 #include "ProtocolV2.h"
 #include "AsyncMessenger.h"
 
+#include "common/EventTrace.h"
 #include "common/errno.h"
 #include "include/random.h"
 
@@ -20,6 +21,8 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
                 << " l=" << connection->policy.lossy << ").";
 }
 
+const int ASYNC_COALESCE_THRESHOLD = 256;
+
 #define WRITE(B, C) write(CONTINUATION(C), B)
 
 #define READ(L, C) read(CONTINUATION(C), L)
@@ -28,14 +31,34 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
 
 using CtPtr = Ct<ProtocolV2> *;
 
+static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
+  // create a buffer to read into that matches the data alignment
+  unsigned alloc_len = 0;
+  unsigned left = len;
+  unsigned head = 0;
+  if (off & ~CEPH_PAGE_MASK) {
+    // head
+    alloc_len += CEPH_PAGE_SIZE;
+    head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
+    left -= head;
+  }
+  alloc_len += left;
+  bufferptr ptr(buffer::create_small_page_aligned(alloc_len));
+  if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
+  data.push_back(std::move(ptr));
+}
+
 ProtocolV2::ProtocolV2(AsyncConnection *connection)
     : Protocol(2, connection),
       temp_buffer(nullptr),
       state(NONE),
       peer_required_features(0),
       cookie(0),
+      message_seq(0),
+      can_write(false),
       bannerExchangeCallback(nullptr),
-      next_frame_len(0) {
+      next_frame_len(0),
+      keepalive(false) {
   temp_buffer = new char[4096];
 }
 
@@ -45,7 +68,31 @@ void ProtocolV2::connect() { state = START_CONNECT; }
 
 void ProtocolV2::accept() { state = START_ACCEPT; }
 
-bool ProtocolV2::is_connected() { return false; }
+bool ProtocolV2::is_connected() { return can_write; }
+
+/*
+ * Tears down the message queues, and removes them from the
+ * DispatchQueue Must hold write_lock prior to calling.
+ */
+void ProtocolV2::discard_out_queue() {
+  ldout(cct, 10) << __func__ << " started" << dendl;
+
+  for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
+    ldout(cct, 20) << __func__ << " discard " << *p << dendl;
+    (*p)->put();
+  }
+  sent.clear();
+  for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
+           out_queue.begin();
+       p != out_queue.end(); ++p) {
+    for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
+         r != p->second.end(); ++r) {
+      ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
+      r->second->put();
+    }
+  }
+  out_queue.clear();
+}
 
 void ProtocolV2::stop() {
   ldout(cct, 2) << __func__ << dendl;
@@ -55,8 +102,13 @@ void ProtocolV2::stop() {
 
   if (connection->delay_state) connection->delay_state->flush();
 
+  std::lock_guard<std::mutex> l(connection->write_lock);
+
+  discard_out_queue();
+
   connection->_stop();
 
+  can_write = false;
   state = CLOSED;
 }
 
@@ -77,9 +129,70 @@ void ProtocolV2::fault() {
   }
 }
 
-void ProtocolV2::send_message(Message *m) {}
+void ProtocolV2::prepare_send_message(uint64_t features, Message *m,
+                                      bufferlist &bl) {
+  ldout(cct, 20) << __func__ << " m=" << *m << dendl;
+
+  // associate message with Connection (for benefit of encode_payload)
+  if (m->empty_payload()) {
+    ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
+                   << " " << *m << dendl;
+  } else {
+    ldout(cct, 20) << __func__ << " half-reencoding features " << features
+                   << " " << m << " " << *m << dendl;
+  }
 
-void ProtocolV2::send_keepalive() {}
+  // encode and copy out of *m
+  m->encode(features, messenger->crcflags);
+
+  bl.append(m->get_payload());
+  bl.append(m->get_middle());
+  bl.append(m->get_data());
+}
+
+void ProtocolV2::send_message(Message *m) {
+  bufferlist bl;
+  uint64_t f = connection->get_features();
+
+  // TODO: Currently not all messages supports reencode like MOSDMap, so here
+  // only let fast dispatch support messages prepare message
+  bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+  if (can_fast_prepare) {
+    prepare_send_message(f, m, bl);
+  }
+
+  std::lock_guard<std::mutex> l(connection->write_lock);
+  // "features" changes will change the payload encoding
+  if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
+    // ensure the correctness of message encoding
+    bl.clear();
+    m->clear_payload();
+    ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
+                  << " != " << connection->get_features() << dendl;
+  }
+  if (state == CLOSED) {
+    ldout(cct, 10) << __func__ << " connection closed."
+                   << " Drop message " << m << dendl;
+    m->put();
+  } else {
+    m->trace.event("async enqueueing message");
+    out_queue[m->get_priority()].emplace_back(std::move(bl), m);
+    ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
+                   << dendl;
+    if (can_write) {
+      connection->center->dispatch_event_external(connection->write_handler);
+    }
+  }
+}
+
+void ProtocolV2::send_keepalive() {
+  ldout(cct, 10) << __func__ << dendl;
+  std::lock_guard<std::mutex> l(connection->write_lock);
+  if (can_write) {
+    keepalive = true;
+    connection->center->dispatch_event_external(connection->write_handler);
+  }
+}
 
 void ProtocolV2::read_event() {
   ldout(cct, 20) << __func__ << dendl;
@@ -91,12 +204,221 @@ void ProtocolV2::read_event() {
     case START_ACCEPT:
       CONTINUATION_RUN(CONTINUATION(start_server_banner_exchange));
       break;
+    case READY:
+      CONTINUATION_RUN(CONTINUATION(read_frame));
+      break;
+    case THROTTLE_MESSAGE:
+      CONTINUATION_RUN(CONTINUATION(throttle_message));
+      break;
+    case THROTTLE_BYTES:
+      CONTINUATION_RUN(CONTINUATION(throttle_bytes));
+      break;
+    case THROTTLE_DISPATCH_QUEUE:
+      CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
+      break;
     default:
       break;
   }
 }
 
-void ProtocolV2::write_event() {}
+Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) {
+  Message *m = 0;
+  if (!out_queue.empty()) {
+    map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
+        out_queue.rbegin();
+    ceph_assert(!it->second.empty());
+    list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
+    m = p->second;
+    if (bl) {
+      bl->swap(p->first);
+    }
+    it->second.erase(p);
+    if (it->second.empty()) {
+      out_queue.erase(it->first);
+    }
+  }
+  return m;
+}
+
+ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) {
+  FUNCTRACE(cct);
+  ceph_assert(connection->center->in_thread());
+  m->set_seq(++out_seq);
+
+  connection->lock.lock();
+  uint64_t ack_seq = in_seq;
+  ack_left = 0;
+  connection->lock.unlock();
+
+  MessageFrame message(m, bl, ack_seq, messenger->crcflags & MSG_CRC_HEADER);
+
+  ldout(cct, 20) << __func__ << " sending message type=" << message.header2.type
+                 << " src " << entity_name_t(messenger->get_myname())
+                 << " front=" << message.header2.front_len
+                 << " data=" << message.header2.data_len << " off "
+                 << message.header2.data_off << dendl;
+
+  bufferlist &msg_bl = message.get_buffer();
+  connection->outcoming_bl.claim_append(msg_bl);
+
+  m->trace.event("async writing message");
+  ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
+                 << dendl;
+  ssize_t total_send_size = connection->outcoming_bl.length();
+  ssize_t rc = connection->_try_send(more);
+  if (rc < 0) {
+    ldout(cct, 1) << __func__ << " error sending " << m << ", "
+                  << cpp_strerror(rc) << dendl;
+  } else {
+    connection->logger->inc(
+        l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+    ldout(cct, 10) << __func__ << " sending " << m
+                   << (rc ? " continuely." : " done.") << dendl;
+  }
+  if (m->get_type() == CEPH_MSG_OSD_OP)
+    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
+  else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
+    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
+  m->put();
+
+  return rc;
+}
+
+void ProtocolV2::append_keepalive() {
+  ldout(cct, 10) << __func__ << dendl;
+  KeepAliveFrame keepalive_frame;
+  connection->outcoming_bl.claim_append(keepalive_frame.get_buffer());
+}
+
+void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
+  struct ceph_timespec ts;
+  timestamp.encode_timeval(&ts);
+  KeepAliveFrame keepalive_ack_frame(ts);
+  connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer());
+}
+
+void ProtocolV2::handle_message_ack(uint64_t seq) {
+  if (connection->policy.lossy) {  // lossy connections don't keep sent messages
+    return;
+  }
+
+  ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
+
+  // trim sent list
+  static const int max_pending = 128;
+  int i = 0;
+  Message *pending[max_pending];
+  connection->write_lock.lock();
+  while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
+    Message *m = sent.front();
+    sent.pop_front();
+    pending[i++] = m;
+    ldout(cct, 10) << __func__ << " got ack seq " << seq
+                   << " >= " << m->get_seq() << " on " << m << " " << *m
+                   << dendl;
+  }
+  connection->write_lock.unlock();
+  for (int k = 0; k < i; k++) {
+    pending[k]->put();
+  }
+}
+
+void ProtocolV2::write_event() {
+  ldout(cct, 10) << __func__ << dendl;
+  ssize_t r = 0;
+
+  connection->write_lock.lock();
+  if (can_write) {
+    if (keepalive) {
+      append_keepalive();
+      keepalive = false;
+    }
+
+    auto start = ceph::mono_clock::now();
+    bool more;
+    do {
+      bufferlist data;
+      Message *m = _get_next_outgoing(&data);
+      if (!m) {
+        break;
+      }
+
+      if (!connection->policy.lossy) {
+        // put on sent list
+        sent.push_back(m);
+        m->get();
+      }
+      more = !out_queue.empty();
+      connection->write_lock.unlock();
+
+      // send_message or requeue messages may not encode message
+      if (!data.length()) {
+        prepare_send_message(connection->get_features(), m, data);
+      }
+
+      r = write_message(m, data, more);
+
+      connection->write_lock.lock();
+      if (r == 0) {
+        ;
+      } else if (r < 0) {
+        ldout(cct, 1) << __func__ << " send msg failed" << dendl;
+        break;
+      } else if (r > 0)
+        break;
+    } while (can_write);
+    connection->write_lock.unlock();
+
+    // if r > 0 mean data still lefted, so no need _try_send.
+    if (r == 0) {
+      uint64_t left = ack_left;
+      if (left) {
+        ceph_le64 s;
+        s = in_seq;
+        AckFrame ack(in_seq);
+        connection->outcoming_bl.claim_append(ack.get_buffer());
+        ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
+                       << " messages" << dendl;
+        ack_left -= left;
+        left = ack_left;
+        r = connection->_try_send(left);
+      } else if (is_queued()) {
+        r = connection->_try_send();
+      }
+    }
+
+    connection->logger->tinc(l_msgr_running_send_time,
+                             ceph::mono_clock::now() - start);
+    if (r < 0) {
+      ldout(cct, 1) << __func__ << " send msg failed" << dendl;
+      connection->lock.lock();
+      fault();
+      connection->lock.unlock();
+      return;
+    }
+  } else {
+    connection->write_lock.unlock();
+    connection->lock.lock();
+    connection->write_lock.lock();
+    // if (state == STANDBY && !connection->policy.server && is_queued()) {
+    //   ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
+    //   connection->_connect();
+    // } else
+    if (connection->cs && state != NONE && state != CLOSED &&
+        state != START_CONNECT) {
+      r = connection->_try_send();
+      if (r < 0) {
+        ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
+        connection->write_lock.unlock();
+        fault();
+        connection->lock.unlock();
+        return;
+      }
+    }
+    connection->write_lock.unlock();
+    connection->lock.unlock();
+  }
+}
 
 bool ProtocolV2::is_queued() { return false; }
 
@@ -252,25 +574,49 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
 }
 
 CtPtr ProtocolV2::read_frame() {
+  if (state == CLOSED) {
+    return nullptr;
+  }
+
   ldout(cct, 20) << __func__ << dendl;
-  return READ(sizeof(__le32), handle_read_frame_length);
+  return READ(sizeof(__le32) * 2, handle_read_frame_length_and_tag);
 }
 
-CtPtr ProtocolV2::handle_read_frame_length(char *buffer, int r) {
+CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
-    ldout(cct, 1) << __func__ << " read frame length failed r=" << r << " ("
-                  << cpp_strerror(r) << ")" << dendl;
+    ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
+                  << " (" << cpp_strerror(r) << ")" << dendl;
     return _fault();
   }
 
-  next_frame_len = *(__le32 *)buffer;
+  next_frame_len = *(uint32_t *)buffer - sizeof(uint32_t);
+  next_tag = static_cast<Tag>(*(uint32_t *)(buffer + sizeof(uint32_t)));
 
-  return READ(next_frame_len, handle_frame);
+  ldout(cct, 10) << __func__ << " next frame_len=" << next_frame_len
+                 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
+
+  switch (next_tag) {
+    case Tag::AUTH_REQUEST:
+    case Tag::AUTH_BAD_METHOD:
+    case Tag::AUTH_BAD_AUTH:
+    case Tag::AUTH_MORE:
+    case Tag::AUTH_DONE:
+    case Tag::IDENT:
+    case Tag::IDENT_MISSING_FEATURES:
+    case Tag::KEEPALIVE2:
+    case Tag::KEEPALIVE2_ACK:
+    case Tag::ACK:
+      return READ(next_frame_len, handle_frame_payload);
+    case Tag::MESSAGE:
+      return handle_message();
+  }
+
+  return nullptr;
 }
 
-CtPtr ProtocolV2::handle_frame(char *buffer, int r) {
+CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
@@ -279,27 +625,27 @@ CtPtr ProtocolV2::handle_frame(char *buffer, int r) {
     return _fault();
   }
 
-  Tag tag = static_cast<Tag>(*(__le32 *)buffer);
-  buffer += sizeof(__le32);
-  uint32_t payload_len = next_frame_len - sizeof(__le32);
-
-  ldout(cct, 10) << __func__ << " tag=" << static_cast<uint32_t>(tag) << dendl;
-
-  switch (tag) {
+  switch (next_tag) {
     case Tag::AUTH_REQUEST:
-      return handle_auth_request(buffer, payload_len);
+      return handle_auth_request(buffer, next_frame_len);
     case Tag::AUTH_BAD_METHOD:
-      return handle_auth_bad_method(buffer, payload_len);
+      return handle_auth_bad_method(buffer, next_frame_len);
     case Tag::AUTH_BAD_AUTH:
-      return handle_auth_bad_auth(buffer, payload_len);
+      return handle_auth_bad_auth(buffer, next_frame_len);
     case Tag::AUTH_MORE:
-      return handle_auth_more(buffer, payload_len);
+      return handle_auth_more(buffer, next_frame_len);
     case Tag::AUTH_DONE:
-      return handle_auth_done(buffer, payload_len);
+      return handle_auth_done(buffer, next_frame_len);
     case Tag::IDENT:
-      return handle_ident(buffer, payload_len);
+      return handle_ident(buffer, next_frame_len);
     case Tag::IDENT_MISSING_FEATURES:
-      return handle_ident_missing_features(buffer, payload_len);
+      return handle_ident_missing_features(buffer, next_frame_len);
+    case Tag::KEEPALIVE2:
+      return handle_keepalive2(buffer, next_frame_len);
+    case Tag::KEEPALIVE2_ACK:
+      return handle_keepalive2_ack(buffer, next_frame_len);
+    case Tag::ACK:
+      return handle_message_ack(buffer, next_frame_len);
     default:
       ceph_abort();
   }
@@ -334,14 +680,14 @@ CtPtr ProtocolV2::handle_auth_more(char *payload, uint32_t length) {
     encode(hello, auth_bl, 0);
     /* END TO REMOVE */
     AuthMoreFrame more(auth_bl);
-    bufferlist bl = more.to_bufferlist();
+    bufferlist &bl = more.get_buffer();
     return WRITE(bl, handle_auth_more_write);
   }
   /* END TO REMOVE */
 
   AuthDoneFrame auth_done(0);
 
-  auto bl = auth_done.to_bufferlist();
+  bufferlist &bl = auth_done.get_buffer();
   return WRITE(bl, handle_auth_done_write);
 }
 
@@ -367,6 +713,491 @@ CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) {
   ceph_abort("wrong state at handle_ident");
 }
 
+CtPtr ProtocolV2::ready() {
+  ldout(cct, 25) << __func__ << dendl;
+
+  // make sure no pending tick timer
+  if (connection->last_tick_id) {
+    connection->center->delete_time_event(connection->last_tick_id);
+  }
+  connection->last_tick_id = connection->center->create_time_event(
+      connection->inactive_timeout_us, connection->tick_handler);
+
+  {
+    std::lock_guard<std::mutex> l(connection->write_lock);
+    can_write = true;
+    if (!out_queue.empty()) {
+      connection->center->dispatch_event_external(connection->write_handler);
+    }
+  }
+
+  state = READY;
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_message() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  ceph_assert(state == READY);
+
+  return READ(sizeof(ceph_msg_header2), handle_message_header);
+}
+
+CtPtr ProtocolV2::handle_message_header(char *buffer, int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " read message header failed" << dendl;
+    return _fault();
+  }
+
+  ceph_msg_header2 header;
+  header = *((ceph_msg_header2 *)buffer);
+
+  entity_name_t src(connection->peer_type, connection->peer_global_id);
+
+  ldout(cct, 20) << __func__ << " got envelope type=" << header.type << " src "
+                 << src << " front=" << header.front_len
+                 << " data=" << header.data_len << " off " << header.data_off
+                 << dendl;
+
+  if (messenger->crcflags & MSG_CRC_HEADER) {
+    __u32 header_crc = 0;
+    header_crc = ceph_crc32c(0, (unsigned char *)&header,
+                             sizeof(header) - sizeof(header.header_crc));
+    // verify header crc
+    if (header_crc != header.header_crc) {
+      ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
+                    << " != " << header.header_crc << dendl;
+      return _fault();
+    }
+  }
+
+  // Reset state
+  data_buf.clear();
+  front.clear();
+  middle.clear();
+  data.clear();
+  current_header = header;
+
+  state = THROTTLE_MESSAGE;
+  return CONTINUE(throttle_message);
+}
+
+CtPtr ProtocolV2::throttle_message() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  if (connection->policy.throttler_messages) {
+    ldout(cct, 10) << __func__ << " wants " << 1
+                   << " message from policy throttler "
+                   << connection->policy.throttler_messages->get_current()
+                   << "/" << connection->policy.throttler_messages->get_max()
+                   << dendl;
+    if (!connection->policy.throttler_messages->get_or_fail()) {
+      ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
+                     << connection->policy.throttler_messages->get_current()
+                     << "/" << connection->policy.throttler_messages->get_max()
+                     << " failed, just wait." << dendl;
+      // following thread pool deal with th full message queue isn't a
+      // short time, so we can wait a ms.
+      if (connection->register_time_events.empty()) {
+        connection->register_time_events.insert(
+            connection->center->create_time_event(1000,
+                                                  connection->wakeup_handler));
+      }
+      return nullptr;
+    }
+  }
+
+  state = THROTTLE_BYTES;
+  return CONTINUE(throttle_bytes);
+}
+
+CtPtr ProtocolV2::throttle_bytes() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+                          current_header.data_len;
+  if (cur_msg_size) {
+    if (connection->policy.throttler_bytes) {
+      ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+                     << " bytes from policy throttler "
+                     << connection->policy.throttler_bytes->get_current() << "/"
+                     << connection->policy.throttler_bytes->get_max() << dendl;
+      if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
+        ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+                       << " bytes from policy throttler "
+                       << connection->policy.throttler_bytes->get_current()
+                       << "/" << connection->policy.throttler_bytes->get_max()
+                       << " failed, just wait." << dendl;
+        // following thread pool deal with th full message queue isn't a
+        // short time, so we can wait a ms.
+        if (connection->register_time_events.empty()) {
+          connection->register_time_events.insert(
+              connection->center->create_time_event(
+                  1000, connection->wakeup_handler));
+        }
+        return nullptr;
+      }
+    }
+  }
+
+  state = THROTTLE_DISPATCH_QUEUE;
+  return CONTINUE(throttle_dispatch_queue);
+}
+
+CtPtr ProtocolV2::throttle_dispatch_queue() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+                          current_header.data_len;
+
+  if (cur_msg_size) {
+    if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
+            cur_msg_size)) {
+      ldout(cct, 10)
+          << __func__ << " wants " << cur_msg_size
+          << " bytes from dispatch throttle "
+          << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
+          << connection->dispatch_queue->dispatch_throttler.get_max()
+          << " failed, just wait." << dendl;
+      // following thread pool deal with th full message queue isn't a
+      // short time, so we can wait a ms.
+      if (connection->register_time_events.empty()) {
+        connection->register_time_events.insert(
+            connection->center->create_time_event(1000,
+                                                  connection->wakeup_handler));
+      }
+      return nullptr;
+    }
+  }
+
+  throttle_stamp = ceph_clock_now();
+
+  state = READ_MESSAGE_FRONT;
+  return read_message_front();
+}
+
+CtPtr ProtocolV2::read_message_front() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  unsigned front_len = current_header.front_len;
+  if (front_len) {
+    if (!front.length()) {
+      front.push_back(buffer::create(front_len));
+    }
+    return READB(front_len, front.c_str(), handle_message_front);
+  }
+  return read_message_middle();
+}
+
+CtPtr ProtocolV2::handle_message_front(char *buffer, int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " read message front failed" << dendl;
+    return _fault();
+  }
+
+  ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
+
+  return read_message_middle();
+}
+
+CtPtr ProtocolV2::read_message_middle() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  if (current_header.middle_len) {
+    if (!middle.length()) {
+      middle.push_back(buffer::create(current_header.middle_len));
+    }
+    return READB(current_header.middle_len, middle.c_str(),
+                 handle_message_middle);
+  }
+
+  return read_message_data_prepare();
+}
+
+CtPtr ProtocolV2::handle_message_middle(char *buffer, int r) {
+  ldout(cct, 20) << __func__ << " r" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
+    return _fault();
+  }
+
+  ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
+
+  return read_message_data_prepare();
+}
+
+CtPtr ProtocolV2::read_message_data_prepare() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  unsigned data_len = le32_to_cpu(current_header.data_len);
+  unsigned data_off = le32_to_cpu(current_header.data_off);
+
+  if (data_len) {
+    // get a buffer
+    map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
+        connection->rx_buffers.find(current_header.tid);
+    if (p != connection->rx_buffers.end()) {
+      ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
+                     << " at offset " << data_off << " len "
+                     << p->second.first.length() << dendl;
+      data_buf = p->second.first;
+      // make sure it's big enough
+      if (data_buf.length() < data_len)
+        data_buf.push_back(buffer::create(data_len - data_buf.length()));
+      data_blp = data_buf.begin();
+    } else {
+      ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
+                     << data_off << dendl;
+      alloc_aligned_buffer(data_buf, data_len, data_off);
+      data_blp = data_buf.begin();
+    }
+  }
+
+  msg_left = data_len;
+
+  return CONTINUE(read_message_data);
+}
+
+CtPtr ProtocolV2::read_message_data() {
+  ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
+
+  if (msg_left > 0) {
+    bufferptr bp = data_blp.get_current_ptr();
+    unsigned read_len = std::min(bp.length(), msg_left);
+
+    return READB(read_len, bp.c_str(), handle_message_data);
+  }
+
+  state = READ_MESSAGE_COMPLETE;
+  return handle_message_complete();
+}
+
+CtPtr ProtocolV2::handle_message_data(char *buffer, int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " read data error " << dendl;
+    return _fault();
+  }
+
+  bufferptr bp = data_blp.get_current_ptr();
+  unsigned read_len = std::min(bp.length(), msg_left);
+  ceph_assert(read_len < std::numeric_limits<int>::max());
+  data_blp.advance(read_len);
+  data.append(bp, 0, read_len);
+  msg_left -= read_len;
+
+  return CONTINUE(read_message_data);
+}
+
+CtPtr ProtocolV2::handle_message_complete() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  ldout(cct, 20) << __func__ << " got " << front.length() << " + "
+                 << middle.length() << " + " << data.length() << " byte message"
+                 << dendl;
+
+  ceph_msg_header header{
+      current_header.seq,
+      current_header.tid,
+      current_header.type,
+      current_header.priority,
+      current_header.version,
+      current_header.front_len,
+      current_header.middle_len,
+      current_header.data_len,
+      current_header.data_off,
+      entity_name_t(connection->peer_type, connection->peer_global_id),
+      current_header.compat_version,
+      current_header.reserved,
+      0};
+  ceph_msg_footer footer{current_header.front_crc, current_header.middle_crc,
+                         current_header.data_crc, 0, current_header.flags};
+
+  Message *message = decode_message(cct, messenger->crcflags, header, footer,
+                                    front, middle, data, connection);
+  if (!message) {
+    ldout(cct, 1) << __func__ << " decode message failed " << dendl;
+    return _fault();
+  }
+
+  //
+  //  Check the signature if one should be present.  A zero return indicates
+  //  success. PLR
+  //
+
+  // if (session_security.get() == NULL) {
+  //   ldout(cct, 10) << __func__ << " no session security set" << dendl;
+  // } else {
+  //   if (session_security->check_message_signature(message)) {
+  //     ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
+  //     message->put();
+  //     return _fault();
+  //   }
+  // }
+  message->set_byte_throttler(connection->policy.throttler_bytes);
+  message->set_message_throttler(connection->policy.throttler_messages);
+
+  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+                          current_header.data_len;
+
+  // store reservation size in message, so we don't get confused
+  // by messages entering the dispatch queue through other paths.
+  message->set_dispatch_throttle_size(cur_msg_size);
+
+  message->set_recv_stamp(recv_stamp);
+  message->set_throttle_stamp(throttle_stamp);
+  message->set_recv_complete_stamp(ceph_clock_now());
+
+  // check received seq#.  if it is old, drop the message.
+  // note that incoming messages may skip ahead.  this is convenient for the
+  // client side queueing because messages can't be renumbered, but the (kernel)
+  // client will occasionally pull a message out of the sent queue to send
+  // elsewhere.  in that case it doesn't matter if we "got" it or not.
+  uint64_t cur_seq = in_seq;
+  if (message->get_seq() <= cur_seq) {
+    ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
+                  << " <= " << cur_seq << " " << message << " " << *message
+                  << ", discarding" << dendl;
+    message->put();
+    if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
+        cct->_conf->ms_die_on_old_message) {
+      ceph_assert(0 == "old msgs despite reconnect_seq feature");
+    }
+    return nullptr;
+  }
+  if (message->get_seq() > cur_seq + 1) {
+    ldout(cct, 0) << __func__ << " missed message?  skipped from seq "
+                  << cur_seq << " to " << message->get_seq() << dendl;
+    if (cct->_conf->ms_die_on_skipped_message) {
+      ceph_assert(0 == "skipped incoming seq");
+    }
+  }
+
+  message->set_connection(connection);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  if (message->get_type() == CEPH_MSG_OSD_OP ||
+      message->get_type() == CEPH_MSG_OSD_OPREPLY) {
+    utime_t ltt_processed_stamp = ceph_clock_now();
+    double usecs_elapsed =
+        (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
+    ostringstream buf;
+    if (message->get_type() == CEPH_MSG_OSD_OP)
+      OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
+                           false);
+    else
+      OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
+                           false);
+  }
+#endif
+
+  // note last received message.
+  in_seq = message->get_seq();
+  ldout(cct, 5) << " rx " << message->get_source() << " seq "
+                << message->get_seq() << " " << message << " " << *message
+                << dendl;
+
+  bool need_dispatch_writer = true;
+  if (!connection->policy.lossy) {
+    ack_left++;
+    need_dispatch_writer = true;
+  }
+
+  state = READY;
+
+  connection->logger->inc(l_msgr_recv_messages);
+  connection->logger->inc(
+      l_msgr_recv_bytes,
+      cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+
+  messenger->ms_fast_preprocess(message);
+  auto fast_dispatch_time = ceph::mono_clock::now();
+  connection->logger->tinc(l_msgr_running_recv_time,
+                           fast_dispatch_time - connection->recv_start_time);
+  if (connection->delay_state) {
+    double delay_period = 0;
+    if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
+      delay_period =
+          cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+      ldout(cct, 1) << "queue_received will delay after "
+                    << (ceph_clock_now() + delay_period) << " on " << message
+                    << " " << *message << dendl;
+    }
+    connection->delay_state->queue(delay_period, message);
+  } else if (messenger->ms_can_fast_dispatch(message)) {
+    connection->lock.unlock();
+    connection->dispatch_queue->fast_dispatch(message);
+    connection->recv_start_time = ceph::mono_clock::now();
+    connection->logger->tinc(l_msgr_running_fast_dispatch_time,
+                             connection->recv_start_time - fast_dispatch_time);
+    connection->lock.lock();
+  } else {
+    connection->dispatch_queue->enqueue(message, message->get_priority(),
+                                        connection->conn_id);
+  }
+
+  handle_message_ack(current_header.ack_seq);
+
+  // clean up local buffer references
+  data_buf.clear();
+  front.clear();
+  middle.clear();
+  data.clear();
+
+  if (need_dispatch_writer && connection->is_connected()) {
+    connection->center->dispatch_event_external(connection->write_handler);
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+  KeepAliveFrame keepalive_frame(payload, length);
+
+  ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
+
+  utime_t kp_t = utime_t(keepalive_frame.timestamp);
+  connection->write_lock.lock();
+  append_keepalive_ack(kp_t);
+  connection->write_lock.unlock();
+
+  ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
+  connection->set_last_keepalive(ceph_clock_now());
+
+  if (is_connected()) {
+    connection->center->dispatch_event_external(connection->write_handler);
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+  KeepAliveFrame keepalive_ack_frame(payload, length);
+  connection->set_last_keepalive_ack(utime_t(keepalive_ack_frame.timestamp));
+  ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+  AckFrame ack(payload, length);
+  handle_message_ack(ack.seq);
+  return CONTINUE(read_frame);
+}
+
 /* Client Protocol Methods */
 
 CtPtr ProtocolV2::start_client_banner_exchange() {
@@ -407,7 +1238,7 @@ CtPtr ProtocolV2::send_auth_request(std::vector<__u32> allowed_methods) {
   }
   AuthRequestFrame authFrame(method, auth_bl);
 
-  bufferlist bl = authFrame.to_bufferlist();
+  bufferlist &bl = authFrame.get_buffer();
   return WRITE(bl, handle_auth_request_write);
 }
 
@@ -477,7 +1308,7 @@ CtPtr ProtocolV2::send_client_ident() {
                 << " flags: " << ident.flags << " cookie: " << std::dec
                 << ident.cookie << dendl;
 
-  bufferlist bl = ident.to_bufferlist();
+  bufferlist &bl = ident.get_buffer();
   return WRITE(bl, handle_client_ident_write);
 }
 
@@ -523,9 +1354,7 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
   connection->set_features(server_ident.required_features &
                            connection->policy.features_supported);
 
-  state = READY;
-
-  return CONTINUE(read_frame);
+  return ready();
 }
 
 /* Server Protocol Methods */
@@ -594,7 +1423,7 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
     ldout(cct, 1) << __func__ << " auth method=" << auth_request.method
                   << " not allowed" << dendl;
     AuthBadMethodFrame bad_method(auth_request.method, allowed_methods);
-    bufferlist bl = bad_method.to_bufferlist();
+    bufferlist &bl = bad_method.get_buffer();
     return WRITE(bl, handle_auth_bad_method_write);
   }
 
@@ -605,7 +1434,7 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
 
   if (!valid) {
     AuthBadAuthFrame bad_auth(12, "Permission denied");
-    bufferlist bl = bad_auth.to_bufferlist();
+    bufferlist &bl = bad_auth.get_buffer();
     return WRITE(bl, handle_auth_bad_auth_write);
   }
 
@@ -616,7 +1445,7 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) {
   encode(hello, auth_bl, 0);
   /* END TO REMOVE */
   AuthMoreFrame more(auth_bl);
-  bufferlist bl = more.to_bufferlist();
+  bufferlist &bl = more.get_buffer();
   return WRITE(bl, handle_auth_more_write);
 }
 
@@ -685,8 +1514,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
                   << feat_missing << std::dec << dendl;
     IdentMissingFeaturesFrame ident_missing_features(feat_missing);
 
-    bufferlist bl;
-    bl = ident_missing_features.to_bufferlist();
+    bufferlist &bl = ident_missing_features.get_buffer();
     return WRITE(bl, handle_ident_missing_features_write);
   }
 
@@ -710,7 +1538,34 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
                 << " flags: " << ident.flags << " cookie: " << std::dec
                 << ident.cookie << dendl;
 
-  bufferlist bl = ident.to_bufferlist();
+  connection->lock.unlock();
+  // Because "replacing" will prevent other connections preempt this addr,
+  // it's safe that here we don't acquire Connection's lock
+  ssize_t r = messenger->accept_conn(connection);
+
+  connection->inject_delay();
+
+  connection->lock.lock();
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
+                  << connection->peer_addrs.msgr2_addr()
+                  << " just fail later one(this)" << dendl;
+    ldout(cct, 10) << "accept fault after register" << dendl;
+    connection->inject_delay();
+    return _fault();
+  }
+  if (state != ACCEPTING) {
+    ldout(cct, 1) << __func__
+                  << " state changed while accept_conn, it must be mark_down"
+                  << dendl;
+    ceph_assert(state == CLOSED || state == NONE);
+    ldout(cct, 10) << "accept fault after register" << dendl;
+    connection->inject_delay();
+    return _fault();
+  }
+
+  bufferlist &bl = ident.get_buffer();
   return WRITE(bl, handle_send_server_ident_write);
 }
 
@@ -723,6 +1578,10 @@ CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
     return _fault();
   }
 
+  // notify
+  connection->dispatch_queue->queue_accept(connection);
+  messenger->ms_deliver_handle_fast_accept(connection);
+
   return CONTINUE(read_frame);
 }
 
@@ -735,7 +1594,5 @@ CtPtr ProtocolV2::handle_send_server_ident_write(int r) {
     return _fault();
   }
 
-  state = READY;
-
-  return CONTINUE(read_frame);
+  return ready();
 }
index e8a7b60b0af173d1c912b7717fb127a016b3845b..7de850756d0bf4034cfe439c57ba00741bba15de 100644 (file)
@@ -15,17 +15,31 @@ private:
     START_ACCEPT,
     ACCEPTING,
     READY,
+    THROTTLE_MESSAGE,
+    THROTTLE_BYTES,
+    THROTTLE_DISPATCH_QUEUE,
+    READ_MESSAGE_FRONT,
+    READ_MESSAGE_COMPLETE,
     CLOSED
   };
 
   static const char *get_state_name(int state) {
-    const char *const statenames[] = {"NONE",       "START_CONNECT",
-                                      "CONNECTING", "START_ACCEPT",
-                                      "ACCEPTING",  "CLOSED"};
+    const char *const statenames[] = {"NONE",
+                                      "START_CONNECT",
+                                      "CONNECTING",
+                                      "START_ACCEPT",
+                                      "ACCEPTING",
+                                      "READY",
+                                      "THROTTLE_MESSAGE",
+                                      "THROTTLE_BYTES",
+                                      "THROTTLE_DISPATCH_QUEUE",
+                                      "READ_MESSAGE_FRONT",
+                                      "READ_MESSAGE_COMPLETE",
+                                      "CLOSED"};
     return statenames[state];
   }
 
-  enum class Tag : __le32 {
+  enum class Tag : uint32_t {
     AUTH_REQUEST,
     AUTH_BAD_METHOD,
     AUTH_BAD_AUTH,
@@ -33,40 +47,46 @@ private:
     AUTH_DONE,
     IDENT,
     IDENT_MISSING_FEATURES,
+    MESSAGE,
+    KEEPALIVE2,
+    KEEPALIVE2_ACK,
+    ACK
   };
 
   struct Frame {
-    __le32 frame_len;
-    __le32 tag;
+    uint32_t tag;
     bufferlist payload;
+    bufferlist frame_buffer;
 
-    Frame(Tag tag, __le32 payload_len)
-        : frame_len(sizeof(__le32) + payload_len),
-          tag(static_cast<__le32>(tag)) {}
+    Frame(Tag tag) : tag(static_cast<uint32_t>(tag)) {
+      encode(this->tag, payload, 0);
+    }
 
-    bufferlist to_bufferlist() {
-      ceph_assert(payload.length() == (frame_len - sizeof(__le32)));
-      bufferlist bl;
-      encode(frame_len, bl, 0);
-      encode(tag, bl, 0);
-      bl.claim_append(payload);
-      return bl;
+    Frame() {}
+
+    bufferlist &get_buffer() {
+      if (frame_buffer.length()) {
+        return frame_buffer;
+      }
+      encode((uint32_t)payload.length(), frame_buffer, 0);
+      frame_buffer.claim_append(payload);
+      return frame_buffer;
     }
   };
 
   struct SignedEncryptedFrame : public Frame {
-    SignedEncryptedFrame(Tag tag, __le32 payload_len)
-        : Frame(tag, payload_len) {}
-    bufferlist to_bufferlist() { return Frame::to_bufferlist(); }
+    SignedEncryptedFrame(Tag tag) : Frame(tag) {}
+    SignedEncryptedFrame() : Frame() {}
+    bufferlist &get_buffer() { return Frame::get_buffer(); }
   };
 
   struct AuthRequestFrame : public Frame {
-    __le32 method;
-    __le32 len;
+    uint32_t method;
+    uint32_t len;
     bufferlist auth_payload;
 
-    AuthRequestFrame(__le32 method, bufferlist &auth_payload)
-        : Frame(Tag::AUTH_REQUEST, sizeof(__le32) * 2 + auth_payload.length()),
+    AuthRequestFrame(uint32_t method, bufferlist &auth_payload)
+        : Frame(Tag::AUTH_REQUEST),
           method(method),
           len(auth_payload.length()),
           auth_payload(auth_payload) {
@@ -75,92 +95,87 @@ private:
       payload.claim_append(auth_payload);
     }
 
-    AuthRequestFrame(char *payload, uint32_t length)
-        : Frame(Tag::AUTH_REQUEST, length) {
-      method = *(__le32 *)payload;
-      len = *(__le32 *)(payload + sizeof(__le32));
-      ceph_assert((length - (sizeof(__le32) * 2)) == len);
-      auth_payload.append((payload + (sizeof(__le32) * 2)), len);
+    AuthRequestFrame(char *payload, uint32_t length) : Frame() {
+      method = *(uint32_t *)payload;
+      len = *(uint32_t *)(payload + sizeof(uint32_t));
+      ceph_assert((length - (sizeof(uint32_t) * 2)) == len);
+      auth_payload.append((payload + (sizeof(uint32_t) * 2)), len);
     }
   };
 
   struct AuthBadMethodFrame : public Frame {
-    __le32 method;
+    uint32_t method;
     std::vector<__u32> allowed_methods;
 
-    AuthBadMethodFrame(__le32 method, std::vector<__u32> methods)
-        : Frame(Tag::AUTH_BAD_METHOD, sizeof(__le32) * (2 + methods.size())),
+    AuthBadMethodFrame(uint32_t method, std::vector<__u32> methods)
+        : Frame(Tag::AUTH_BAD_METHOD),
           method(method),
           allowed_methods(methods) {
       encode(method, payload, 0);
-      encode((__le32)allowed_methods.size(), payload, 0);
+      encode((uint32_t)allowed_methods.size(), payload, 0);
       for (const auto &a_meth : allowed_methods) {
         encode(a_meth, payload, 0);
       }
     }
 
-    AuthBadMethodFrame(char *payload, uint32_t length)
-        : Frame(Tag::AUTH_BAD_METHOD, length) {
-      method = *(__le32 *)payload;
-      __le32 num_methods = *(__le32 *)(payload + sizeof(__le32));
-      payload += sizeof(__le32) * 2;
+    AuthBadMethodFrame(char *payload, uint32_t length) : Frame() {
+      method = *(uint32_t *)payload;
+      uint32_t num_methods = *(uint32_t *)(payload + sizeof(uint32_t));
+      payload += sizeof(uint32_t) * 2;
       for (unsigned i = 0; i < num_methods; ++i) {
-        allowed_methods.push_back(*(__le32 *)(payload + sizeof(__le32) * i));
+        allowed_methods.push_back(
+            *(uint32_t *)(payload + sizeof(uint32_t) * i));
       }
     }
   };
 
   struct AuthBadAuthFrame : public Frame {
-    __le32 error_code;
+    uint32_t error_code;
     std::string error_msg;
 
-    AuthBadAuthFrame(__le32 error_code, std::string error_msg)
-        : Frame(Tag::AUTH_BAD_AUTH, sizeof(__le32) * 2 + error_msg.size()),
+    AuthBadAuthFrame(uint32_t error_code, std::string error_msg)
+        : Frame(Tag::AUTH_BAD_AUTH),
           error_code(error_code),
           error_msg(error_msg) {
       encode(error_code, payload, 0);
       encode(error_msg, payload, 0);
     }
 
-    AuthBadAuthFrame(char *payload, uint32_t length)
-        : Frame(Tag::AUTH_BAD_AUTH, length) {
-      error_code = *(__le32 *)payload;
-      __le32 len = *(__le32 *)(payload + sizeof(__le32));
-      error_msg = std::string(payload + sizeof(__le32) * 2, len);
+    AuthBadAuthFrame(char *payload, uint32_t length) : Frame() {
+      error_code = *(uint32_t *)payload;
+      uint32_t len = *(uint32_t *)(payload + sizeof(uint32_t));
+      error_msg = std::string(payload + sizeof(uint32_t) * 2, len);
     }
   };
 
   struct AuthMoreFrame : public Frame {
-    __le32 len;
+    uint32_t len;
     bufferlist auth_payload;
 
     AuthMoreFrame(bufferlist &auth_payload)
-        : Frame(Tag::AUTH_MORE, sizeof(__le32) + auth_payload.length()),
+        : Frame(Tag::AUTH_MORE),
           len(auth_payload.length()),
           auth_payload(auth_payload) {
       encode(len, payload, 0);
       payload.claim_append(auth_payload);
     }
 
-    AuthMoreFrame(char *payload, uint32_t length)
-        : Frame(Tag::AUTH_BAD_AUTH, length) {
-      len = *(__le32 *)payload;
-      ceph_assert((length - sizeof(__le32)) == len);
-      auth_payload.append(payload + sizeof(__le32), len);
+    AuthMoreFrame(char *payload, uint32_t length) : Frame() {
+      len = *(uint32_t *)payload;
+      ceph_assert((length - sizeof(uint32_t)) == len);
+      auth_payload.append(payload + sizeof(uint32_t), len);
     }
   };
 
   struct AuthDoneFrame : public Frame {
-    __le64 flags;
+    uint64_t flags;
 
-    AuthDoneFrame(uint64_t flags)
-        : Frame(Tag::AUTH_DONE, sizeof(__le64)), flags(flags) {
+    AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE), flags(flags) {
       encode(flags, payload, 0);
     }
 
-    AuthDoneFrame(char *payload, uint32_t length)
-        : Frame(Tag::AUTH_DONE, length) {
-      flags = *(__le64 *)payload;
+    AuthDoneFrame(char *payload, uint32_t length) : Frame() {
+      flags = *(uint64_t *)payload;
     }
   };
 
@@ -174,7 +189,7 @@ private:
 
     IdentFrame(entity_addrvec_t addrs, int64_t gid, uint64_t supported_features,
                uint64_t required_features, uint64_t flags, uint64_t cookie)
-        : SignedEncryptedFrame(Tag::IDENT, 0),
+        : SignedEncryptedFrame(Tag::IDENT),
           addrs(addrs),
           gid(gid),
           supported_features(supported_features),
@@ -187,13 +202,11 @@ private:
       encode(required_features, payload, -1ll);
       encode(flags, payload, -1ll);
       encode(cookie, payload, -1ll);
-      frame_len = sizeof(uint32_t) + payload.length();
     }
 
-    IdentFrame(char *payload, uint32_t length)
-        : SignedEncryptedFrame(Tag::IDENT, length) {
+    IdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
       bufferlist bl;
-      bl.append(payload, length);
+      bl.push_back(buffer::create_static(length, payload));
       try {
         auto ti = bl.cbegin();
         decode(addrs, ti);
@@ -211,25 +224,117 @@ private:
     __le64 features;
 
     IdentMissingFeaturesFrame(uint64_t features)
-        : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, sizeof(uint64_t)),
+        : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES),
           features(features) {
       encode(features, payload, -1ll);
     }
 
     IdentMissingFeaturesFrame(char *payload, uint32_t length)
-        : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, length) {
+        : SignedEncryptedFrame() {
       features = *(uint64_t *)payload;
     }
   };
 
+  struct MessageFrame : public SignedEncryptedFrame {
+    const unsigned int ASYNC_COALESCE_THRESHOLD = 256;
+
+    ceph_msg_header2 header2;
+
+    MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq,
+                 bool calc_crc)
+        : SignedEncryptedFrame(Tag::MESSAGE) {
+      ceph_msg_header &header = msg->get_header();
+      ceph_msg_footer &footer = msg->get_footer();
+
+      header2 = ceph_msg_header2{header.seq,        header.tid,
+                                 header.type,       header.priority,
+                                 header.version,    header.front_len,
+                                 header.middle_len, 0,
+                                 header.data_len,   header.data_off,
+                                 ack_seq,           footer.front_crc,
+                                 footer.middle_crc, footer.data_crc,
+                                 footer.flags,      header.compat_version,
+                                 header.reserved,   0};
+
+      if (calc_crc) {
+        header2.header_crc =
+            ceph_crc32c(0, (unsigned char *)&header2,
+                        sizeof(header2) - sizeof(header2.header_crc));
+      }
+
+      payload.append((char *)&header2, sizeof(header2));
+      if ((data.length() <= ASYNC_COALESCE_THRESHOLD) &&
+          (data.buffers().size() > 1)) {
+        for (const auto &pb : data.buffers()) {
+          payload.append((char *)pb.c_str(), pb.length());
+        }
+      } else {
+        payload.claim_append(data);
+      }
+    }
+  };
+
+  struct KeepAliveFrame : public SignedEncryptedFrame {
+    struct ceph_timespec timestamp;
+
+    KeepAliveFrame() : SignedEncryptedFrame(Tag::KEEPALIVE2) {
+      struct ceph_timespec ts;
+      utime_t t = ceph_clock_now();
+      t.encode_timeval(&ts);
+      payload.append((char *)&ts, sizeof(ts));
+    }
+
+    KeepAliveFrame(struct ceph_timespec &timestamp)
+        : SignedEncryptedFrame(Tag::KEEPALIVE2_ACK) {
+      payload.append((char *)&timestamp, sizeof(timestamp));
+    }
+
+    KeepAliveFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+      ceph_assert(length == sizeof(timestamp));
+      timestamp = *(struct ceph_timespec *)payload;
+    }
+  };
+
+  struct AckFrame : public SignedEncryptedFrame {
+    uint64_t seq;
+
+    AckFrame(uint64_t seq) : SignedEncryptedFrame(Tag::ACK) {
+      encode(seq, payload, 0);
+    }
+
+    AckFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+      seq = *(uint64_t *)payload;
+    }
+  };
+
   char *temp_buffer;
   State state;
   uint64_t peer_required_features;
   uint64_t cookie;
+  uint64_t message_seq;
+  bool can_write;
+  std::map<int, std::list<std::pair<bufferlist, Message *>>> out_queue;
+  std::list<Message *> sent;
+  std::atomic<uint64_t> out_seq{0};
+  std::atomic<uint64_t> in_seq{0};
+  std::atomic<uint64_t> ack_left{0};
 
   using ProtFuncPtr = void (ProtocolV2::*)();
   Ct<ProtocolV2> *bannerExchangeCallback;
 
+  uint32_t next_frame_len;
+  Tag next_tag;
+  ceph_msg_header2 current_header;
+  utime_t backoff;  // backoff time
+  utime_t recv_stamp;
+  utime_t throttle_stamp;
+  unsigned msg_left;
+  bufferlist data_buf;
+  bufferlist::iterator data_blp;
+  bufferlist front, middle, data;
+
+  bool keepalive;
+
   ostream &_conn_prefix(std::ostream *_dout);
 
   Ct<ProtocolV2> *read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
@@ -242,6 +347,14 @@ private:
     return nullptr;
   }
 
+  void discard_out_queue();
+  void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
+  Message *_get_next_outgoing(bufferlist *bl);
+  ssize_t write_message(Message *m, bufferlist &bl, bool more);
+  void append_keepalive();
+  void append_keepalive_ack(utime_t &timestamp);
+  void handle_message_ack(uint64_t seq);
+
   WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_write);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2,
                                  _banner_exchange_handle_peer_banner);
@@ -250,19 +363,49 @@ private:
   Ct<ProtocolV2> *_banner_exchange_handle_write(int r);
   Ct<ProtocolV2> *_banner_exchange_handle_peer_banner(char *buffer, int r);
 
-  uint32_t next_frame_len;
   CONTINUATION_DECL(ProtocolV2, read_frame);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload);
   WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_more_write);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header);
+  CONTINUATION_DECL(ProtocolV2, throttle_message);
+  CONTINUATION_DECL(ProtocolV2, throttle_bytes);
+  CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_front);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_middle);
+  CONTINUATION_DECL(ProtocolV2, read_message_data);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data);
 
   Ct<ProtocolV2> *read_frame();
-  Ct<ProtocolV2> *handle_read_frame_length(char *buffer, int r);
-  Ct<ProtocolV2> *handle_frame(char *buffer, int r);
+  Ct<ProtocolV2> *handle_read_frame_length_and_tag(char *buffer, int r);
+  Ct<ProtocolV2> *handle_frame_payload(char *buffer, int r);
+
   Ct<ProtocolV2> *handle_auth_more(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_auth_more_write(int r);
+
   Ct<ProtocolV2> *handle_ident(char *payload, uint32_t length);
 
+  Ct<ProtocolV2> *ready();
+
+  Ct<ProtocolV2> *handle_message();
+  Ct<ProtocolV2> *handle_message_header(char *buffer, int r);
+  Ct<ProtocolV2> *throttle_message();
+  Ct<ProtocolV2> *throttle_bytes();
+  Ct<ProtocolV2> *throttle_dispatch_queue();
+  Ct<ProtocolV2> *read_message_front();
+  Ct<ProtocolV2> *handle_message_front(char *buffer, int r);
+  Ct<ProtocolV2> *read_message_middle();
+  Ct<ProtocolV2> *handle_message_middle(char *buffer, int r);
+  Ct<ProtocolV2> *read_message_data_prepare();
+  Ct<ProtocolV2> *read_message_data();
+  Ct<ProtocolV2> *handle_message_data(char *buffer, int r);
+  Ct<ProtocolV2> *handle_message_complete();
+
+  Ct<ProtocolV2> *handle_keepalive2(char *payload, uint32_t length);
+  Ct<ProtocolV2> *handle_keepalive2_ack(char *payload, uint32_t length);
+
+  Ct<ProtocolV2> *handle_message_ack(char *payload, uint32_t length);
+
 public:
   ProtocolV2(AsyncConnection *connection);
   virtual ~ProtocolV2();