]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: ProtocolV1: use continuations instead of direct function calls 24305/head
authorRicardo Dias <rdias@suse.com>
Thu, 27 Sep 2018 15:09:45 +0000 (16:09 +0100)
committerRicardo Dias <rdias@suse.com>
Thu, 27 Sep 2018 15:58:37 +0000 (16:58 +0100)
When using direct function calls to implement the V1 state machine,
we might fill up the stack due to state machine cycles.
With continuations we fix this problem by limiting the depth of
the call stack.

Fixes: http://tracker.ceph.com/issues/36167
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/Protocol.cc
src/msg/async/Protocol.h

index cf2110d92a541a681085d00f787bc7f3a537b4f6..7f2c6e869bba4e6503dc8ceb1537b27f1c770583 100644 (file)
@@ -160,21 +160,18 @@ void AsyncConnection::maybe_start_delay_thread()
 }
 
 
-void AsyncConnection::read(unsigned len, char *buffer,
-                           std::function<void(char *, ssize_t)> callback) {
+ssize_t AsyncConnection::read(unsigned len, char *buffer,
+                              std::function<void(char *, ssize_t)> callback) {
   ldout(async_msgr->cct, 20) << __func__
                              << (pendingReadLen ? " continue" : " start")
                              << " len=" << len << dendl;
-  readCallback = callback;
-  pendingReadLen = len;
-  read_buffer = buffer;
   ssize_t r = read_until(len, buffer);
-  if (r <= 0) {
-    // read all bytes, or an error occured
-    pendingReadLen.reset();
-    read_buffer = nullptr;
-    callback(buffer, r);
+  if (r > 0) {
+    readCallback = callback;
+    pendingReadLen = len;
+    read_buffer = buffer;
   }
+  return r;
 }
 
 // Because this func will be called multi times to populate
@@ -283,20 +280,17 @@ ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
   return nread;
 }
 
-void AsyncConnection::write(bufferlist &bl,
-                            std::function<void(ssize_t)> callback, bool more) {
+ssize_t AsyncConnection::write(bufferlist &bl,
+                               std::function<void(ssize_t)> callback,
+                               bool more) {
 
     std::unique_lock<std::mutex> l(write_lock);
-    writeCallback = callback;
     outcoming_bl.claim_append(bl);
     ssize_t r = _try_send(more);
-    if (r <= 0) {
-      // either finish writting, or returned an error
-      writeCallback.reset();
-      l.unlock();
-      callback(r);
-      return;
+    if (r > 0) {
+      writeCallback = callback;
     }
+    return r;
 }
 
 // return the remaining bytes, it may larger than the length of ptr
@@ -423,7 +417,13 @@ void AsyncConnection::process() {
 
     case STATE_CONNECTION_ESTABLISHED: {
       if (pendingReadLen) {
-        read(*pendingReadLen, read_buffer, readCallback);
+        ssize_t r = read(*pendingReadLen, read_buffer, readCallback);
+        if (r <= 0) { // read all bytes, or an error occured
+          pendingReadLen.reset();
+          char *buf_tmp = read_buffer;
+          read_buffer = nullptr;
+          readCallback(buf_tmp, r);
+        }
         return;
       }
       break;
index 78025d8342ba353f6e76f77e3110a8ff80b0b9e7..17bd1b39a93cb9f9b082520d1e9ba26305850cce 100644 (file)
@@ -52,13 +52,13 @@ static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
  */
 class AsyncConnection : public Connection {
 
-  void read(unsigned len, char *buffer,
-            std::function<void(char *, ssize_t)> callback);
+  ssize_t read(unsigned len, char *buffer,
+               std::function<void(char *, ssize_t)> callback);
   ssize_t read_until(unsigned needed, char *p);
   ssize_t read_bulk(char *buf, unsigned len);
 
-  void write(bufferlist &bl, std::function<void(ssize_t)> callback,
-             bool more=false);
+  ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback,
+                bool more=false);
   ssize_t _try_send(bool more=false);
 
   void _connect();
index bcddc6989e52bd159983960248176f91f3bb543b..32eaa5f97f23bc203d28807533a13f331b84c620 100644 (file)
@@ -19,17 +19,11 @@ ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
                 << " l=" << connection->policy.lossy << ").";
 }
 
-#define WRITE(B, F) \
-  connection->write(B, std::bind(F, this, std::placeholders::_1))
+#define WRITE(B, C) write(CONTINUATION(C), B)
 
-#define READ(L, F)    \
-  connection->read(   \
-      L, temp_buffer, \
-      std::bind(F, this, std::placeholders::_1, std::placeholders::_2))
+#define READ(L, C) read(CONTINUATION(C), L)
 
-#define READB(L, B, F) \
-  connection->read(    \
-      L, B, std::bind(F, this, std::placeholders::_1, std::placeholders::_2))
+#define READB(L, B, C) read(CONTINUATION(C), L, B)
 
 // Constant to limit starting sequence number to 2^31.  Nothing special about
 // it, just a big number.  PLR
@@ -292,22 +286,22 @@ void ProtocolV1::read_event() {
   ldout(cct, 20) << __func__ << dendl;
   switch (state) {
     case START_CONNECT:
-      send_client_banner();
+      CONTINUATION_RUN(CONTINUATION(send_client_banner));
       break;
     case START_ACCEPT:
-      send_server_banner();
+      CONTINUATION_RUN(CONTINUATION(send_server_banner));
       break;
     case OPENED:
-      wait_message();
+      CONTINUATION_RUN(CONTINUATION(wait_message));
       break;
     case THROTTLE_MESSAGE:
-      throttle_message();
+      CONTINUATION_RUN(CONTINUATION(throttle_message));
       break;
     case THROTTLE_BYTES:
-      throttle_bytes();
+      CONTINUATION_RUN(CONTINUATION(throttle_bytes));
       break;
     case THROTTLE_DISPATCH_QUEUE:
-      throttle_dispatch_queue();
+      CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
       break;
     default:
       break;
@@ -413,7 +407,41 @@ void ProtocolV1::write_event() {
 
 bool ProtocolV1::is_queued() { return !out_q.empty(); }
 
-void ProtocolV1::ready() {
+void ProtocolV1::run_continuation(CtPtr continuation) {
+  CONTINUATION_RUN(continuation);
+}
+
+CtPtr ProtocolV1::read(CONTINUATION_PARAM(next, ProtocolV1, char *, int),
+                       int len, char *buffer) {
+  if (!buffer) {
+    buffer = temp_buffer;
+  }
+  ssize_t r = connection->read(len, buffer,
+                               [CONTINUATION(next), this](char *buffer, int r) {
+                                 CONTINUATION(next)->setParams(buffer, r);
+                                 CONTINUATION_RUN(CONTINUATION(next));
+                               });
+  if (r <= 0) {
+    return CONTINUE(next, buffer, r);
+  }
+
+  return nullptr;
+}
+
+CtPtr ProtocolV1::write(CONTINUATION_PARAM(next, ProtocolV1, int),
+                        bufferlist &buffer) {
+  ssize_t r = connection->write(buffer, [CONTINUATION(next), this](int r) {
+    CONTINUATION(next)->setParams(r);
+    CONTINUATION_RUN(CONTINUATION(next));
+  });
+  if (r <= 0) {
+    return CONTINUE(next, r);
+  }
+
+  return nullptr;
+}
+
+CtPtr ProtocolV1::ready() {
   ldout(cct, 25) << __func__ << dendl;
 
   // make sure no pending tick timer
@@ -432,26 +460,25 @@ void ProtocolV1::ready() {
   connection->maybe_start_delay_thread();
 
   state = OPENED;
-  wait_message();
+  return wait_message();
 }
 
-void ProtocolV1::wait_message() {
+CtPtr ProtocolV1::wait_message() {
   if (state != OPENED) {  // must have changed due to a replace
-    return;
+    return nullptr;
   }
 
   ldout(cct, 20) << __func__ << dendl;
 
-  READ(sizeof(char), &ProtocolV1::handle_message);
+  return READ(sizeof(char), handle_message);
 }
 
-void ProtocolV1::handle_message(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read tag failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   char tag = buffer[0];
@@ -461,34 +488,34 @@ void ProtocolV1::handle_message(char *buffer, int r) {
     ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
     connection->set_last_keepalive(ceph_clock_now());
   } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
-    READ(sizeof(ceph_timespec), &ProtocolV1::handle_keepalive2);
+    return READ(sizeof(ceph_timespec), handle_keepalive2);
   } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
-    READ(sizeof(ceph_timespec), &ProtocolV1::handle_keepalive2_ack);
+    return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
   } else if (tag == CEPH_MSGR_TAG_ACK) {
-    READ(sizeof(ceph_le64), &ProtocolV1::handle_tag_ack);
+    return READ(sizeof(ceph_le64), handle_tag_ack);
   } else if (tag == CEPH_MSGR_TAG_MSG) {
 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
     ltt_recv_stamp = ceph_clock_now();
 #endif
     recv_stamp = ceph_clock_now();
     ldout(cct, 20) << __func__ << " begin MSG" << dendl;
-    READ(sizeof(ceph_msg_header), &ProtocolV1::handle_message_header);
+    return READ(sizeof(ceph_msg_header), handle_message_header);
   } else if (tag == CEPH_MSGR_TAG_CLOSE) {
     ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
     stop();
   } else {
     ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
-    fault();
+    return _fault();
   }
+  return nullptr;
 }
 
-void ProtocolV1::handle_keepalive2(char *buffer, int r) {
+CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
@@ -507,7 +534,7 @@ void ProtocolV1::handle_keepalive2(char *buffer, int r) {
     connection->center->dispatch_event_external(connection->write_handler);
   }
 
-  wait_message();
+  return CONTINUE(wait_message);
 }
 
 void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
@@ -529,13 +556,12 @@ void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
   }
 }
 
-void ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
+CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ceph_timespec *t;
@@ -543,16 +569,15 @@ void ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
   connection->set_last_keepalive_ack(utime_t(*t));
   ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
 
-  wait_message();
+  return CONTINUE(wait_message);
 }
 
-void ProtocolV1::handle_tag_ack(char *buffer, int r) {
+CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ceph_le64 seq;
@@ -578,16 +603,15 @@ void ProtocolV1::handle_tag_ack(char *buffer, int r) {
     pending[k]->put();
   }
 
-  wait_message();
+  return CONTINUE(wait_message);
 }
 
-void ProtocolV1::handle_message_header(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read message header failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 20) << __func__ << " got MSG header" << dendl;
@@ -608,8 +632,7 @@ void ProtocolV1::handle_message_header(char *buffer, int r) {
     if (header_crc != header.crc) {
       ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
                     << " != " << header.crc << dendl;
-      fault();
-      return;
+      return _fault();
     }
   }
 
@@ -621,10 +644,10 @@ void ProtocolV1::handle_message_header(char *buffer, int r) {
   current_header = header;
 
   state = THROTTLE_MESSAGE;
-  throttle_message();
+  return CONTINUE(throttle_message);
 }
 
-void ProtocolV1::throttle_message() {
+CtPtr ProtocolV1::throttle_message() {
   ldout(cct, 20) << __func__ << dendl;
 
   if (connection->policy.throttler_messages) {
@@ -645,15 +668,15 @@ void ProtocolV1::throttle_message() {
             connection->center->create_time_event(1000,
                                                   connection->wakeup_handler));
       }
-      return;
+      return nullptr;
     }
   }
 
   state = THROTTLE_BYTES;
-  throttle_bytes();
+  return CONTINUE(throttle_bytes);
 }
 
-void ProtocolV1::throttle_bytes() {
+CtPtr ProtocolV1::throttle_bytes() {
   ldout(cct, 20) << __func__ << dendl;
 
   cur_msg_size = current_header.front_len + current_header.middle_len +
@@ -677,16 +700,16 @@ void ProtocolV1::throttle_bytes() {
               connection->center->create_time_event(
                   1000, connection->wakeup_handler));
         }
-        return;
+        return nullptr;
       }
     }
   }
 
   state = THROTTLE_DISPATCH_QUEUE;
-  throttle_dispatch_queue();
+  return CONTINUE(throttle_dispatch_queue);
 }
 
-void ProtocolV1::throttle_dispatch_queue() {
+CtPtr ProtocolV1::throttle_dispatch_queue() {
   ldout(cct, 20) << __func__ << dendl;
 
   if (cur_msg_size) {
@@ -705,17 +728,17 @@ void ProtocolV1::throttle_dispatch_queue() {
             connection->center->create_time_event(1000,
                                                   connection->wakeup_handler));
       }
-      return;
+      return nullptr;
     }
   }
 
   throttle_stamp = ceph_clock_now();
 
   state = READ_MESSAGE_FRONT;
-  read_message_front();
+  return read_message_front();
 }
 
-void ProtocolV1::read_message_front() {
+CtPtr ProtocolV1::read_message_front() {
   ldout(cct, 20) << __func__ << dendl;
 
   unsigned front_len = current_header.front_len;
@@ -723,55 +746,52 @@ void ProtocolV1::read_message_front() {
     if (!front.length()) {
       front.push_back(buffer::create(front_len));
     }
-    READB(front_len, front.c_str(), &ProtocolV1::handle_message_front);
-  } else {
-    read_message_middle();
+    return READB(front_len, front.c_str(), handle_message_front);
   }
+  return read_message_middle();
 }
 
-void ProtocolV1::handle_message_front(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read message front failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
 
-  read_message_middle();
+  return read_message_middle();
 }
 
-void ProtocolV1::read_message_middle() {
+CtPtr ProtocolV1::read_message_middle() {
   ldout(cct, 20) << __func__ << dendl;
 
   if (current_header.middle_len) {
     if (!middle.length()) {
       middle.push_back(buffer::create(current_header.middle_len));
     }
-    READB(current_header.middle_len, middle.c_str(),
-          &ProtocolV1::handle_message_middle);
-  } else {
-    read_message_data_prepare();
+    return READB(current_header.middle_len, middle.c_str(),
+                 handle_message_middle);
   }
+
+  return read_message_data_prepare();
 }
 
-void ProtocolV1::handle_message_middle(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
 
-  read_message_data_prepare();
+  return read_message_data_prepare();
 }
 
-void ProtocolV1::read_message_data_prepare() {
+CtPtr ProtocolV1::read_message_data_prepare() {
   ldout(cct, 20) << __func__ << dendl;
 
   unsigned data_len = le32_to_cpu(current_header.data_len);
@@ -800,29 +820,28 @@ void ProtocolV1::read_message_data_prepare() {
 
   msg_left = data_len;
 
-  read_message_data();
+  return CONTINUE(read_message_data);
 }
 
-void ProtocolV1::read_message_data() {
+CtPtr ProtocolV1::read_message_data() {
   ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
 
   if (msg_left > 0) {
     bufferptr bp = data_blp.get_current_ptr();
     unsigned read_len = std::min(bp.length(), msg_left);
 
-    READB(read_len, bp.c_str(), &ProtocolV1::handle_message_data);
-  } else {
-    read_message_footer();
+    return READB(read_len, bp.c_str(), handle_message_data);
   }
+
+  return read_message_footer();
 }
 
-void ProtocolV1::handle_message_data(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read data error " << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   bufferptr bp = data_blp.get_current_ptr();
@@ -831,10 +850,10 @@ void ProtocolV1::handle_message_data(char *buffer, int r) {
   data.append(bp, 0, read_len);
   msg_left -= read_len;
 
-  read_message_data();
+  return CONTINUE(read_message_data);
 }
 
-void ProtocolV1::read_message_footer() {
+CtPtr ProtocolV1::read_message_footer() {
   ldout(cct, 20) << __func__ << dendl;
 
   state = READ_FOOTER_AND_DISPATCH;
@@ -846,16 +865,15 @@ void ProtocolV1::read_message_footer() {
     len = sizeof(ceph_msg_footer_old);
   }
 
-  READ(len, &ProtocolV1::handle_message_footer);
+  return READ(len, handle_message_footer);
 }
 
-void ProtocolV1::handle_message_footer(char *buffer, int r) {
+CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read footer data error " << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ceph_msg_footer footer;
@@ -878,8 +896,7 @@ void ProtocolV1::handle_message_footer(char *buffer, int r) {
     ldout(cct, 0) << __func__ << " got " << front.length() << " + "
                   << middle.length() << " + " << data.length()
                   << " byte message.. ABORTED" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 20) << __func__ << " got " << front.length() << " + "
@@ -889,8 +906,7 @@ void ProtocolV1::handle_message_footer(char *buffer, int r) {
                                     footer, front, middle, data, connection);
   if (!message) {
     ldout(cct, 1) << __func__ << " decode message failed " << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   //
@@ -904,8 +920,7 @@ void ProtocolV1::handle_message_footer(char *buffer, int r) {
     if (session_security->check_message_signature(message)) {
       ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
       message->put();
-      fault();
-      return;
+      return _fault();
     }
   }
   message->set_byte_throttler(connection->policy.throttler_bytes);
@@ -934,7 +949,7 @@ void ProtocolV1::handle_message_footer(char *buffer, int r) {
         cct->_conf->ms_die_on_old_message) {
       ceph_assert(0 == "old msgs despite reconnect_seq feature");
     }
-    return;
+    return nullptr;
   }
   if (message->get_seq() > cur_seq + 1) {
     ldout(cct, 0) << __func__ << " missed message?  skipped from seq "
@@ -1017,7 +1032,7 @@ void ProtocolV1::handle_message_footer(char *buffer, int r) {
     connection->center->dispatch_event_external(connection->write_handler);
   }
 
-  wait_message();
+  return CONTINUE(wait_message);
 }
 
 void ProtocolV1::session_reset() {
@@ -1269,30 +1284,29 @@ Message *ProtocolV1::_get_next_outgoing(bufferlist *bl) {
  * Client Protocol V1
  **/
 
-void ProtocolV1::send_client_banner() {
+CtPtr ProtocolV1::send_client_banner() {
   ldout(cct, 20) << __func__ << dendl;
   state = CONNECTING;
 
   bufferlist bl;
   bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
-  WRITE(bl, &ProtocolV1::handle_client_banner_write);
+  return WRITE(bl, handle_client_banner_write);
 }
 
-void ProtocolV1::handle_client_banner_write(int r) {
+CtPtr ProtocolV1::handle_client_banner_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
   ldout(cct, 10) << __func__ << " connect write banner done: "
                  << connection->get_peer_addr() << dendl;
 
-  wait_server_banner();
+  return wait_server_banner();
 }
 
-void ProtocolV1::wait_server_banner() {
+CtPtr ProtocolV1::wait_server_banner() {
   state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
 
   ldout(cct, 20) << __func__ << dendl;
@@ -1300,25 +1314,23 @@ void ProtocolV1::wait_server_banner() {
   bufferlist myaddrbl;
   unsigned banner_len = strlen(CEPH_BANNER);
   unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
-  READ(need_len, &ProtocolV1::handle_server_banner_and_identify);
+  return READ(need_len, handle_server_banner_and_identify);
 }
 
-void ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
+CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
                   << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   unsigned banner_len = strlen(CEPH_BANNER);
   if (memcmp(buffer, CEPH_BANNER, banner_len)) {
     ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
                   << connection->get_peer_addr() << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   bufferlist bl;
@@ -1331,8 +1343,7 @@ void ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
     decode(peer_addr_for_me, p);
   } catch (const buffer::error &e) {
     lderr(cct) << __func__ << " decode peer addr failed " << dendl;
-    fault();
-    return;
+    return _fault();
   }
   ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
                  << " on socket " << connection->cs.fd() << dendl;
@@ -1347,8 +1358,7 @@ void ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
     } else {
       ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
                      << peer_addr << dendl;
-      fault();
-      return;
+      return _fault();
     }
   }
 
@@ -1372,30 +1382,29 @@ void ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
     ldout(cct, 1) << __func__
                   << " state changed while learned_addr, mark_down or "
                   << " replacing must be happened just now" << dendl;
-    return;
+    return nullptr;
   }
 
   bufferlist myaddrbl;
   encode(messenger->get_myaddrs().legacy_addr(), myaddrbl, 0);  // legacy
-  WRITE(myaddrbl, &ProtocolV1::handle_my_addr_write);
+  return WRITE(myaddrbl, handle_my_addr_write);
 }
 
-void ProtocolV1::handle_my_addr_write(int r) {
+CtPtr ProtocolV1::handle_my_addr_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
                   << cpp_strerror(r) << dendl;
-    fault();
-    return;
+    return _fault();
   }
   ldout(cct, 10) << __func__ << " connect sent my addr "
                  << messenger->get_myaddrs().legacy_addr() << dendl;
 
-  send_connect_message();
+  return CONTINUE(send_connect_message);
 }
 
-void ProtocolV1::send_connect_message() {
+CtPtr ProtocolV1::send_connect_message() {
   state = CONNECTING_SEND_CONNECT_MSG;
 
   ldout(cct, 20) << __func__ << dendl;
@@ -1436,39 +1445,37 @@ void ProtocolV1::send_connect_message() {
                  << " cseq=" << connect_seq
                  << " proto=" << connect.protocol_version << dendl;
 
-  WRITE(bl, &ProtocolV1::handle_connect_message_write);
+  return WRITE(bl, handle_connect_message_write);
 }
 
-void ProtocolV1::handle_connect_message_write(int r) {
+CtPtr ProtocolV1::handle_connect_message_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 2) << __func__ << " connect couldn't send reply "
                   << cpp_strerror(r) << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 20) << __func__
                  << " connect wrote (self +) cseq, waiting for reply" << dendl;
 
-  wait_connect_reply();
+  return wait_connect_reply();
 }
 
-void ProtocolV1::wait_connect_reply() {
+CtPtr ProtocolV1::wait_connect_reply() {
   ldout(cct, 20) << __func__ << dendl;
 
   memset(&connect_reply, 0, sizeof(connect_reply));
-  READ(sizeof(connect_reply), &ProtocolV1::handle_connect_reply_1);
+  return READ(sizeof(connect_reply), handle_connect_reply_1);
 }
 
-void ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   connect_reply = *((ceph_msg_connect_reply *)buffer);
@@ -1482,14 +1489,13 @@ void ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
                  << connect_reply.features << dendl;
 
   if (connect_reply.authorizer_len) {
-    wait_connect_reply_auth();
-    return;
+    return wait_connect_reply_auth();
   }
 
-  handle_connect_reply_2();
+  return handle_connect_reply_2();
 }
 
-void ProtocolV1::wait_connect_reply_auth() {
+CtPtr ProtocolV1::wait_connect_reply_auth() {
   ldout(cct, 20) << __func__ << dendl;
 
   ldout(cct, 10) << __func__
@@ -1498,17 +1504,16 @@ void ProtocolV1::wait_connect_reply_auth() {
 
   ceph_assert(connect_reply.authorizer_len < 4096);
 
-  READ(connect_reply.authorizer_len, &ProtocolV1::handle_connect_reply_auth);
+  return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
 }
 
-void ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
                   << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   bufferlist authorizer_reply;
@@ -1517,21 +1522,19 @@ void ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
   if (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
     ldout(cct, 10) << __func__ << " connect got auth challenge" << dendl;
     authorizer->add_challenge(cct, authorizer_reply);
-    send_connect_message();
-    return;
+    return CONTINUE(send_connect_message);
   }
 
   auto iter = authorizer_reply.cbegin();
   if (authorizer && !authorizer->verify_reply(iter)) {
     ldout(cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
-  handle_connect_reply_2();
+  return handle_connect_reply_2();
 }
 
-void ProtocolV1::handle_connect_reply_2() {
+CtPtr ProtocolV1::handle_connect_reply_2() {
   ldout(cct, 20) << __func__ << dendl;
 
   if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
@@ -1541,30 +1544,26 @@ void ProtocolV1::handle_connect_reply_2() {
                   << (connect_reply.features &
                       ~connection->policy.features_supported)
                   << std::dec << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
     ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
                   << messenger->get_proto_version(connection->peer_type, true)
                   << " != " << connect_reply.protocol_version << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
     ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
     if (got_bad_auth) {
-      fault();
-      return;
+      return _fault();
     }
     got_bad_auth = true;
     delete authorizer;
     authorizer =
         messenger->get_authorizer(connection->peer_type, true);  // try harder
-    send_connect_message();
-    return;
+    return CONTINUE(send_connect_message);
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
@@ -1575,8 +1574,7 @@ void ProtocolV1::handle_connect_reply_2() {
     // see session_reset
     connection->outcoming_bl.clear();
 
-    send_connect_message();
-    return;
+    return CONTINUE(send_connect_message);
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
@@ -1584,8 +1582,7 @@ void ProtocolV1::handle_connect_reply_2() {
     ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
                   << connect_reply.global_seq << " chose new " << global_seq
                   << dendl;
-    send_connect_message();
-    return;
+    return CONTINUE(send_connect_message);
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
@@ -1593,15 +1590,13 @@ void ProtocolV1::handle_connect_reply_2() {
     ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
                   << " -> " << connect_reply.connect_seq << dendl;
     connect_seq = connect_reply.connect_seq;
-    send_connect_message();
-    return;
+    return CONTINUE(send_connect_message);
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
     ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
     state = WAIT;
-    fault();
-    return;
+    return _fault();
   }
 
   uint64_t feat_missing;
@@ -1610,8 +1605,7 @@ void ProtocolV1::handle_connect_reply_2() {
   if (feat_missing) {
     ldout(cct, 1) << __func__ << " missing required features " << std::hex
                   << feat_missing << std::dec << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
@@ -1620,30 +1614,28 @@ void ProtocolV1::handle_connect_reply_2() {
         << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
         << dendl;
 
-    wait_ack_seq();
-    return;
+    return wait_ack_seq();
   }
 
   if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
     ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
   }
 
-  client_ready();
+  return client_ready();
 }
 
-void ProtocolV1::wait_ack_seq() {
+CtPtr ProtocolV1::wait_ack_seq() {
   ldout(cct, 20) << __func__ << dendl;
 
-  READ(sizeof(uint64_t), &ProtocolV1::handle_ack_seq);
+  return READ(sizeof(uint64_t), handle_ack_seq);
 }
 
-void ProtocolV1::handle_ack_seq(char *buffer, int r) {
+CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   uint64_t newly_acked_seq = 0;
@@ -1657,24 +1649,23 @@ void ProtocolV1::handle_ack_seq(char *buffer, int r) {
   uint64_t s = in_seq;
   bl.append((char *)&s, sizeof(s));
 
-  WRITE(bl, &ProtocolV1::handle_in_seq_write);
+  return WRITE(bl, handle_in_seq_write);
 }
 
-void ProtocolV1::handle_in_seq_write(int r) {
+CtPtr ProtocolV1::handle_in_seq_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
 
-  client_ready();
+  return client_ready();
 }
 
-void ProtocolV1::client_ready() {
+CtPtr ProtocolV1::client_ready() {
   ldout(cct, 20) << __func__ << dendl;
 
   // hooray!
@@ -1709,14 +1700,14 @@ void ProtocolV1::client_ready() {
   connection->dispatch_queue->queue_connect(connection);
   messenger->ms_deliver_handle_fast_connect(connection);
 
-  ready();
+  return ready();
 }
 
 /**
  * Server Protocol V1
  **/
 
-void ProtocolV1::send_server_banner() {
+CtPtr ProtocolV1::send_server_banner() {
   ldout(cct, 20) << __func__ << dendl;
   state = ACCEPTING;
 
@@ -1732,44 +1723,41 @@ void ProtocolV1::send_server_banner() {
   ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd() << " "
                 << connection->socket_addr << dendl;
 
-  WRITE(bl, &ProtocolV1::handle_server_banner_write);
+  return WRITE(bl, handle_server_banner_write);
 }
 
-void ProtocolV1::handle_server_banner_write(int r) {
+CtPtr ProtocolV1::handle_server_banner_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << " write server banner failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
   ldout(cct, 10) << __func__ << " write banner and addr done: "
                  << connection->get_peer_addr() << dendl;
 
-  wait_client_banner();
+  return wait_client_banner();
 }
 
-void ProtocolV1::wait_client_banner() {
+CtPtr ProtocolV1::wait_client_banner() {
   ldout(cct, 20) << __func__ << dendl;
 
-  READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
-       &ProtocolV1::handle_client_banner);
+  return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
+              handle_client_banner);
 }
 
-void ProtocolV1::handle_client_banner(char *buffer, int r) {
+CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
     ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
                   << "' (should be '" << CEPH_BANNER << "')" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   bufferlist addr_bl;
@@ -1781,8 +1769,7 @@ void ProtocolV1::handle_client_banner(char *buffer, int r) {
     decode(peer_addr, ti);
   } catch (const buffer::error &e) {
     lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
@@ -1798,23 +1785,22 @@ void ProtocolV1::handle_client_banner(char *buffer, int r) {
   connection->set_peer_addr(peer_addr);  // so that connection_state gets set up
   connection->target_addr = peer_addr;
 
-  wait_connect_message();
+  return CONTINUE(wait_connect_message);
 }
 
-void ProtocolV1::wait_connect_message() {
+CtPtr ProtocolV1::wait_connect_message() {
   ldout(cct, 20) << __func__ << dendl;
 
   memset(&connect_msg, 0, sizeof(connect_msg));
-  READ(sizeof(connect_msg), &ProtocolV1::handle_connect_message_1);
+  return READ(sizeof(connect_msg), handle_connect_message_1);
 }
 
-void ProtocolV1::handle_connect_message_1(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   connect_msg = *((ceph_msg_connect *)buffer);
@@ -1822,36 +1808,34 @@ void ProtocolV1::handle_connect_message_1(char *buffer, int r) {
   state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
 
   if (connect_msg.authorizer_len) {
-    wait_connect_message_auth();
-    return;
+    return wait_connect_message_auth();
   }
 
-  handle_connect_message_2();
+  return handle_connect_message_2();
 }
 
-void ProtocolV1::wait_connect_message_auth() {
+CtPtr ProtocolV1::wait_connect_message_auth() {
   ldout(cct, 20) << __func__ << dendl;
 
   if (!authorizer_buf.length()) {
     authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
   }
-  READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
-        &ProtocolV1::handle_connect_message_auth);
+  return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
+               handle_connect_message_auth);
 }
 
-void ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
+CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
-  handle_connect_message_2();
+  return handle_connect_message_2();
 }
 
-void ProtocolV1::handle_connect_message_2() {
+CtPtr ProtocolV1::handle_connect_message_2() {
   ldout(cct, 20) << __func__ << dendl;
 
   ldout(cct, 20) << __func__ << " accept got peer connect_seq "
@@ -1880,9 +1864,8 @@ void ProtocolV1::handle_connect_message_2() {
                  << ", their proto " << connect_msg.protocol_version << dendl;
 
   if (connect_msg.protocol_version != reply.protocol_version) {
-    send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
-                               authorizer_reply);
-    return;
+    return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
+                                      authorizer_reply);
   }
 
   // require signatures for cephx?
@@ -1914,8 +1897,8 @@ void ProtocolV1::handle_connect_message_2() {
   if (feat_missing) {
     ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
                   << feat_missing << std::dec << dendl;
-    send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply, authorizer_reply);
-    return;
+    return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
+                                      authorizer_reply);
   }
 
   connection->lock.unlock();
@@ -1933,16 +1916,14 @@ void ProtocolV1::handle_connect_message_2() {
     if (need_challenge && !had_challenge && authorizer_challenge) {
       ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
       ceph_assert(authorizer_reply.length());
-      send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, reply,
-                                 authorizer_reply);
-      return;
+      return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
+                                        reply, authorizer_reply);
     } else {
       ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
                     << authorizer_reply.length() << dendl;
       session_security.reset();
-      send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
-                                 authorizer_reply);
-      return;
+      return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
+                                        authorizer_reply);
     }
   }
 
@@ -1961,8 +1942,7 @@ void ProtocolV1::handle_connect_message_2() {
                   << " state changed while accept, it must be mark_down"
                   << dendl;
     ceph_assert(state == CLOSED);
-    fault();
-    return;
+    return _fault();
   }
 
   if (existing == connection) {
@@ -1987,8 +1967,7 @@ void ProtocolV1::handle_connect_message_2() {
       existing->lock.unlock();
       existing = nullptr;
 
-      open(reply, authorizer_reply);
-      return;
+      return open(reply, authorizer_reply);
     }
 
     if (exproto->replacing) {
@@ -1998,9 +1977,8 @@ void ProtocolV1::handle_connect_message_2() {
                     << connection->get_state_name(existing->state) << dendl;
       reply.global_seq = exproto->peer_global_seq;
       existing->lock.unlock();
-      send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
-                                 authorizer_reply);
-      return;
+      return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
+                                        authorizer_reply);
     }
 
     if (connect_msg.global_seq < exproto->peer_global_seq) {
@@ -2009,9 +1987,8 @@ void ProtocolV1::handle_connect_message_2() {
                      << connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
       reply.global_seq = exproto->peer_global_seq;  // so we can send it below..
       existing->lock.unlock();
-      send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
-                                 authorizer_reply);
-      return;
+      return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
+                                        authorizer_reply);
     } else {
       ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
                      << exproto->peer_global_seq
@@ -2025,8 +2002,7 @@ void ProtocolV1::handle_connect_message_2() {
           << " accept replacing existing (lossy) channel (new one lossy="
           << connection->policy.lossy << ")" << dendl;
       exproto->session_reset();
-      replace(existing, reply, authorizer_reply);
-      return;
+      return replace(existing, reply, authorizer_reply);
     }
 
     ldout(cct, 1) << __func__ << " accept connect_seq "
@@ -2046,8 +2022,7 @@ void ProtocolV1::handle_connect_message_2() {
         exproto->session_reset();  // this resets out_queue, msg_ and
                                    // connect_seq #'s
       }
-      replace(existing, reply, authorizer_reply);
-      return;
+      return replace(existing, reply, authorizer_reply);
     }
 
     if (connect_msg.connect_seq < exproto->connect_seq) {
@@ -2057,9 +2032,8 @@ void ProtocolV1::handle_connect_message_2() {
                      << ", RETRY_SESSION" << dendl;
       reply.connect_seq = exproto->connect_seq + 1;
       existing->lock.unlock();
-      send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
-                                 authorizer_reply);
-      return;
+      return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
+                                        authorizer_reply);
     }
 
     if (connect_msg.connect_seq == exproto->connect_seq) {
@@ -2075,15 +2049,13 @@ void ProtocolV1::handle_connect_message_2() {
         // if connect_seq both zero, dont stuck into dead lock. it's ok to
         // replace
         if (connection->policy.resetcheck && exproto->connect_seq == 0) {
-          replace(existing, reply, authorizer_reply);
-          return;
+          return replace(existing, reply, authorizer_reply);
         }
 
         reply.connect_seq = exproto->connect_seq + 1;
         existing->lock.unlock();
-        send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
-                                   authorizer_reply);
-        return;
+        return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
+                                          authorizer_reply);
       }
 
       // connection race?
@@ -2094,8 +2066,7 @@ void ProtocolV1::handle_connect_message_2() {
                        << existing << ".cseq " << exproto->connect_seq
                        << " == " << connect_msg.connect_seq
                        << ", or we are server, replacing my attempt" << dendl;
-        replace(existing, reply, authorizer_reply);
-        return;
+        return replace(existing, reply, authorizer_reply);
       } else {
         // our existing outgoing wins
         ldout(messenger->cct, 10)
@@ -2105,8 +2076,8 @@ void ProtocolV1::handle_connect_message_2() {
         ceph_assert(connection->peer_addrs.legacy_addr() >
                     messenger->get_myaddr());
         existing->lock.unlock();
-        send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply, authorizer_reply);
-        return;
+        return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
+                                          authorizer_reply);
       }
     }
 
@@ -2120,38 +2091,34 @@ void ProtocolV1::handle_connect_message_2() {
                     << ".cseq = " << exproto->connect_seq
                     << "), sending RESETSESSION " << dendl;
       existing->lock.unlock();
-      send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
-                                 authorizer_reply);
-      return;
+      return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
+                                        authorizer_reply);
     }
 
     // reconnect
     ldout(cct, 10) << __func__ << " accept peer sent cseq "
                    << connect_msg.connect_seq << " > " << exproto->connect_seq
                    << dendl;
-    replace(existing, reply, authorizer_reply);
-    return;
+    return replace(existing, reply, authorizer_reply);
   }  // existing
   else if (!replacing && connect_msg.connect_seq > 0) {
     // we reset, and they are opening a new session
     ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
                   << connect_msg.connect_seq << "), sending RESETSESSION"
                   << dendl;
-    send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
-                               authorizer_reply);
-    return;
+    return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
+                                      authorizer_reply);
   } else {
     // new session
     ldout(cct, 10) << __func__ << " accept new session" << dendl;
     existing = nullptr;
-    open(reply, authorizer_reply);
-    return;
+    return open(reply, authorizer_reply);
   }
 }
 
-void ProtocolV1::send_connect_message_reply(char tag,
-                                            ceph_msg_connect_reply &reply,
-                                            bufferlist &authorizer_reply) {
+CtPtr ProtocolV1::send_connect_message_reply(char tag,
+                                             ceph_msg_connect_reply &reply,
+                                             bufferlist &authorizer_reply) {
   ldout(cct, 20) << __func__ << dendl;
   bufferlist reply_bl;
   reply.tag = tag;
@@ -2166,25 +2133,24 @@ void ProtocolV1::send_connect_message_reply(char tag,
     authorizer_reply.clear();
   }
 
-  WRITE(reply_bl, &ProtocolV1::handle_connect_message_reply_write);
+  return WRITE(reply_bl, handle_connect_message_reply_write);
 }
 
-void ProtocolV1::handle_connect_message_reply_write(int r) {
+CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << " write connect message reply failed" << dendl;
     connection->inject_delay();
-    fault();
-    return;
+    return _fault();
   }
 
-  wait_connect_message();
+  return CONTINUE(wait_connect_message);
 }
 
-void ProtocolV1::replace(AsyncConnectionRef existing,
-                         ceph_msg_connect_reply &reply,
-                         bufferlist &authorizer_reply) {
+CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
+                          ceph_msg_connect_reply &reply,
+                          bufferlist &authorizer_reply) {
   ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
 
   connection->inject_delay();
@@ -2287,8 +2253,8 @@ void ProtocolV1::replace(AsyncConnectionRef existing,
             existing->center->create_file_event(
                 existing->cs.fd(), EVENT_READABLE, existing->read_handler);
             reply.global_seq = exproto->peer_global_seq;
-            exproto->send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL,
-                                                reply, authorizer_reply);
+            exproto->run_continuation(exproto->send_connect_message_reply(
+                CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
           };
           if (existing->center->in_thread())
             transfer_existing();
@@ -2302,15 +2268,15 @@ void ProtocolV1::replace(AsyncConnectionRef existing,
                                 std::move(deactivate_existing), true);
     existing->write_lock.unlock();
     existing->lock.unlock();
-    return;
+    return nullptr;
   }
   existing->lock.unlock();
 
-  open(reply, authorizer_reply);
+  return open(reply, authorizer_reply);
 }
 
-void ProtocolV1::open(ceph_msg_connect_reply &reply,
-                      bufferlist &authorizer_reply) {
+CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
+                       bufferlist &authorizer_reply) {
   ldout(cct, 20) << __func__ << dendl;
 
   connect_seq = connect_msg.connect_seq + 1;
@@ -2378,8 +2344,7 @@ void ProtocolV1::open(ceph_msg_connect_reply &reply,
                   << " just fail later one(this)" << dendl;
     ldout(cct, 10) << "accept fault after register" << dendl;
     connection->inject_delay();
-    fault();
-    return;
+    return _fault();
   }
   if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
     ldout(cct, 1) << __func__
@@ -2388,21 +2353,19 @@ void ProtocolV1::open(ceph_msg_connect_reply &reply,
     ceph_assert(state == CLOSED || state == NONE);
     ldout(cct, 10) << "accept fault after register" << dendl;
     connection->inject_delay();
-    fault();
-    return;
+    return _fault();
   }
 
-  WRITE(reply_bl, &ProtocolV1::handle_ready_connect_message_reply_write);
+  return WRITE(reply_bl, handle_ready_connect_message_reply_write);
 }
 
-void ProtocolV1::handle_ready_connect_message_reply_write(int r) {
+CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " write ready connect message reply failed"
                   << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   // notify
@@ -2413,25 +2376,24 @@ void ProtocolV1::handle_ready_connect_message_reply_write(int r) {
   state = ACCEPTING_HANDLED_CONNECT_MSG;
 
   if (wait_for_seq) {
-    wait_seq();
-  } else {
-    server_ready();
+    return wait_seq();
   }
+
+  return server_ready();
 }
 
-void ProtocolV1::wait_seq() {
+CtPtr ProtocolV1::wait_seq() {
   ldout(cct, 20) << __func__ << dendl;
 
-  READ(sizeof(uint64_t), &ProtocolV1::handle_seq);
+  return READ(sizeof(uint64_t), handle_seq);
 }
 
-void ProtocolV1::handle_seq(char *buffer, int r) {
+CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
-    fault();
-    return;
+    return _fault();
   }
 
   uint64_t newly_acked_seq = *(uint64_t *)buffer;
@@ -2439,10 +2401,10 @@ void ProtocolV1::handle_seq(char *buffer, int r) {
                 << dendl;
   out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
 
-  server_ready();
+  return server_ready();
 }
 
-void ProtocolV1::server_ready() {
+CtPtr ProtocolV1::server_ready() {
   ldout(cct, 20) << __func__ << dendl;
 
   ldout(cct, 20) << __func__ << " accept done" << dendl;
@@ -2452,5 +2414,5 @@ void ProtocolV1::server_ready() {
     ceph_assert(connection->delay_state->ready());
   }
 
-  ready();
+  return ready();
 }
index 88dbb6b1077f6930eca88b32bae2c2864140373e..f564f2dd5bc447b04f1a18a29443718c631ad99c 100644 (file)
@@ -8,6 +8,61 @@
 #include "include/buffer.h"
 #include "include/msgr.h"
 
+/*
+ * Continuation Helper Classes
+ */
+
+#include <memory>
+#include <tuple>
+
+template <class C>
+class Ct {
+public:
+  virtual ~Ct() {}
+  virtual Ct<C> *call(C *foo) const = 0;
+};
+
+template <class C, typename... Args>
+class CtFun : public Ct<C> {
+private:
+  using fn = Ct<C> *(C::*)(Args...);
+  fn _f;
+  std::tuple<Args...> _params;
+
+  template <std::size_t... Is>
+  inline Ct<C> *_call(C *foo, std::index_sequence<Is...>) const {
+    return (foo->*_f)(std::get<Is>(_params)...);
+  }
+
+public:
+  CtFun(fn f) : _f(f) {}
+
+  inline void setParams(Args... args) { _params = std::make_tuple(args...); }
+  inline Ct<C> *call(C *foo) const override {
+    return _call(foo, std::index_sequence_for<Args...>());
+  }
+};
+
+#define CONTINUATION_DECL(C, F, ...)                \
+  std::unique_ptr<CtFun<C, ##__VA_ARGS__>> F##_cont_ =  \
+      std::make_unique<CtFun<C, ##__VA_ARGS__>>(&C::F); \
+  CtFun<C, ##__VA_ARGS__> *F##_cont = F##_cont_.get()
+
+#define CONTINUATION_PARAM(V, C, ...) CtFun<C, ##__VA_ARGS__> *V##_cont
+
+#define CONTINUATION(F) F##_cont
+#define CONTINUE(F, ...) F##_cont->setParams(__VA_ARGS__), F##_cont
+
+#define CONTINUATION_RUN(CT)                                      \
+  {                                                               \
+    Ct<std::remove_reference<decltype(*this)>::type> *_cont = CT; \
+    while (_cont) {                                               \
+      _cont = _cont->call(this);                                  \
+    }                                                             \
+  }
+
+//////////////////////////////////////////////////////////////////////
+
 class AsyncMessenger;
 
 class Protocol {
@@ -40,6 +95,11 @@ public:
   virtual bool is_queued() = 0;
 };
 
+class ProtocolV1;
+using CtPtr = Ct<ProtocolV1>*;
+#define READ_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, char*, int)
+#define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int)
+
 class ProtocolV1 : public Protocol {
 /*
  *  ProtocolV1 State Machine
@@ -87,6 +147,7 @@ handle_tag_ack           |              v                                 |
 */
 
 protected:
+
   enum State {
     NONE = 0,
     START_CONNECT,
@@ -174,28 +235,52 @@ protected:
 
   State state;
 
-  void ready();
-  void wait_message();
-  void handle_message(char *buffer, int r);
+  void run_continuation(CtPtr continuation);
+  CtPtr read(CONTINUATION_PARAM(next, ProtocolV1, char *, int), int len,
+             char *buffer = nullptr);
+  CtPtr write(CONTINUATION_PARAM(next, ProtocolV1, int), bufferlist &bl);
+  inline CtPtr _fault() {  // helper fault method that stops continuation
+    fault();
+    return nullptr;
+  }
 
-  void handle_keepalive2(char *buffer, int r);
+  CONTINUATION_DECL(ProtocolV1, wait_message);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2_ack);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_tag_ack);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_header);
+  CONTINUATION_DECL(ProtocolV1, throttle_message);
+  CONTINUATION_DECL(ProtocolV1, throttle_bytes);
+  CONTINUATION_DECL(ProtocolV1, throttle_dispatch_queue);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_front);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_middle);
+  CONTINUATION_DECL(ProtocolV1, read_message_data);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_data);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_footer);
+
+  CtPtr ready();
+  CtPtr wait_message();
+  CtPtr handle_message(char *buffer, int r);
+
+  CtPtr handle_keepalive2(char *buffer, int r);
   void append_keepalive_or_ack(bool ack = false, utime_t *t = nullptr);
-  void handle_keepalive2_ack(char *buffer, int r);
-  void handle_tag_ack(char *buffer, int r);
-
-  void handle_message_header(char *buffer, int r);
-  void throttle_message();
-  void throttle_bytes();
-  void throttle_dispatch_queue();
-  void read_message_front();
-  void handle_message_front(char *buffer, int r);
-  void read_message_middle();
-  void handle_message_middle(char *buffer, int r);
-  void read_message_data_prepare();
-  void read_message_data();
-  void handle_message_data(char *buffer, int r);
-  void read_message_footer();
-  void handle_message_footer(char *buffer, int r);
+  CtPtr handle_keepalive2_ack(char *buffer, int r);
+  CtPtr handle_tag_ack(char *buffer, int r);
+
+  CtPtr handle_message_header(char *buffer, int r);
+  CtPtr throttle_message();
+  CtPtr throttle_bytes();
+  CtPtr throttle_dispatch_queue();
+  CtPtr read_message_front();
+  CtPtr handle_message_front(char *buffer, int r);
+  CtPtr read_message_middle();
+  CtPtr handle_message_middle(char *buffer, int r);
+  CtPtr read_message_data_prepare();
+  CtPtr read_message_data();
+  CtPtr handle_message_data(char *buffer, int r);
+  CtPtr read_message_footer();
+  CtPtr handle_message_footer(char *buffer, int r);
 
   void session_reset();
   void randomize_out_seq();
@@ -235,46 +320,69 @@ private:
   bool got_bad_auth;
   AuthAuthorizer *authorizer;
 
-  void send_client_banner();
-  void handle_client_banner_write(int r);
-  void wait_server_banner();
-  void handle_server_banner_and_identify(char *buffer, int r);
-  void handle_my_addr_write(int r);
-  void send_connect_message();
-  void handle_connect_message_write(int r);
-  void wait_connect_reply();
-  void handle_connect_reply_1(char *buffer, int r);
-  void wait_connect_reply_auth();
-  void handle_connect_reply_auth(char *buffer, int r);
-  void handle_connect_reply_2();
-  void wait_ack_seq();
-  void handle_ack_seq(char *buffer, int r);
-  void handle_in_seq_write(int r);
-  void client_ready();
+  CONTINUATION_DECL(ProtocolV1, send_client_banner);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner_write);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_and_identify);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_my_addr_write);
+  CONTINUATION_DECL(ProtocolV1, send_connect_message);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_write);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_1);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_auth);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_ack_seq);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_in_seq_write);
+
+  CtPtr send_client_banner();
+  CtPtr handle_client_banner_write(int r);
+  CtPtr wait_server_banner();
+  CtPtr handle_server_banner_and_identify(char *buffer, int r);
+  CtPtr handle_my_addr_write(int r);
+  CtPtr send_connect_message();
+  CtPtr handle_connect_message_write(int r);
+  CtPtr wait_connect_reply();
+  CtPtr handle_connect_reply_1(char *buffer, int r);
+  CtPtr wait_connect_reply_auth();
+  CtPtr handle_connect_reply_auth(char *buffer, int r);
+  CtPtr handle_connect_reply_2();
+  CtPtr wait_ack_seq();
+  CtPtr handle_ack_seq(char *buffer, int r);
+  CtPtr handle_in_seq_write(int r);
+  CtPtr client_ready();
 
   // Server Protocol
-private:
+protected:
   bool wait_for_seq;
 
-  void send_server_banner();
-  void handle_server_banner_write(int r);
-  void wait_client_banner();
-  void handle_client_banner(char *buffer, int r);
-  void wait_connect_message();
-  void handle_connect_message_1(char *buffer, int r);
-  void wait_connect_message_auth();
-  void handle_connect_message_auth(char *buffer, int r);
-  void handle_connect_message_2();
-  void send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
-                                  bufferlist &authorizer_reply);
-  void handle_connect_message_reply_write(int r);
-  void replace(AsyncConnectionRef existing, ceph_msg_connect_reply &reply,
-               bufferlist &authorizer_reply);
-  void open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply);
-  void handle_ready_connect_message_reply_write(int r);
-  void wait_seq();
-  void handle_seq(char *buffer, int r);
-  void server_ready();
+  CONTINUATION_DECL(ProtocolV1, send_server_banner);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_write);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner);
+  CONTINUATION_DECL(ProtocolV1, wait_connect_message);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_1);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_auth);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
+                                  handle_connect_message_reply_write);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
+                                  handle_ready_connect_message_reply_write);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_seq);
+
+  CtPtr send_server_banner();
+  CtPtr handle_server_banner_write(int r);
+  CtPtr wait_client_banner();
+  CtPtr handle_client_banner(char *buffer, int r);
+  CtPtr wait_connect_message();
+  CtPtr handle_connect_message_1(char *buffer, int r);
+  CtPtr wait_connect_message_auth();
+  CtPtr handle_connect_message_auth(char *buffer, int r);
+  CtPtr handle_connect_message_2();
+  CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
+                                   bufferlist &authorizer_reply);
+  CtPtr handle_connect_message_reply_write(int r);
+  CtPtr replace(AsyncConnectionRef existing, ceph_msg_connect_reply &reply,
+                bufferlist &authorizer_reply);
+  CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply);
+  CtPtr handle_ready_connect_message_reply_write(int r);
+  CtPtr wait_seq();
+  CtPtr handle_seq(char *buffer, int r);
+  CtPtr server_ready();
 };
 
 class LoopbackProtocolV1 : public ProtocolV1 {