]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: msgr2: implement reconnect
authorRicardo Dias <rdias@suse.com>
Thu, 8 Nov 2018 17:15:03 +0000 (17:15 +0000)
committerRicardo Dias <rdias@suse.com>
Wed, 23 Jan 2019 13:59:24 +0000 (13:59 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/Protocol.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 4eba36371a5bc878c2014fe826e550fd034e37a0..ddf8bafe363ae3d2810e52e8a0686a5e333ebcf4 100644 (file)
@@ -46,7 +46,7 @@ public:
   }
 };
 
-#define CONTINUATION_DECL(C, F, ...)                \
+#define CONTINUATION_DECL(C, F, ...)                    \
   std::unique_ptr<CtFun<C, ##__VA_ARGS__>> F##_cont_ =  \
       std::make_unique<CtFun<C, ##__VA_ARGS__>>(&C::F); \
   CtFun<C, ##__VA_ARGS__> *F##_cont = F##_cont_.get()
@@ -64,7 +64,16 @@ public:
     }                                                             \
   }
 
-#define READ_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, char*, int)
+#define CONTINUATION_RUN2(I, CT)                               \
+  {                                                            \
+    Ct<std::remove_reference<decltype(*I)>::type> *_cont = CT; \
+    while (_cont) {                                            \
+      _cont = _cont->call(I);                                  \
+    }                                                          \
+  }
+
+#define READ_HANDLER_CONTINUATION_DECL(C, F) \
+  CONTINUATION_DECL(C, F, char *, int)
 #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int)
 
 //////////////////////////////////////////////////////////////////////
index 9323e71f2c8398138580795445141143822b8485..b8d29b641a54ddf04b8e7967052522d695ac7ad3 100644 (file)
@@ -54,9 +54,11 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       state(NONE),
       peer_required_features(0),
       cookie(0),
+      connect_seq(0),
+      peer_global_seq(0),
       message_seq(0),
+      replacing(false),
       can_write(false),
-      connect_seq(0),
       bannerExchangeCallback(nullptr),
       next_frame_len(0),
       keepalive(false) {
@@ -95,6 +97,30 @@ void ProtocolV2::discard_out_queue() {
   out_queue.clear();
 }
 
+void ProtocolV2::reset_session() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  std::lock_guard<std::mutex> l(connection->write_lock);
+  if (connection->delay_state) {
+    connection->delay_state->discard();
+  }
+
+  connection->dispatch_queue->discard_queue(connection->conn_id);
+  discard_out_queue();
+  connection->outcoming_bl.clear();
+
+  connection->dispatch_queue->queue_remote_reset(connection);
+
+  out_seq = 0;
+  in_seq = 0;
+  cookie = 0;
+  connect_seq = 0;
+  peer_global_seq = 0;
+  message_seq = 0;
+  ack_left = 0;
+  can_write = false;
+}
+
 void ProtocolV2::stop() {
   ldout(cct, 2) << __func__ << dendl;
   if (state == CLOSED) {
@@ -131,6 +157,28 @@ void ProtocolV2::requeue_sent() {
   }
 }
 
+uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
+  ldout(cct, 10) << __func__ << " " << seq << dendl;
+  std::lock_guard<std::mutex> l(connection->write_lock);
+  if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
+    return seq;
+  }
+  list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+  uint64_t count = out_seq;
+  while (!rq.empty()) {
+    pair<bufferlist, Message *> p = rq.front();
+    if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
+    ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
+                   << p.second->get_seq() << " <= " << seq << ", discarding"
+                   << dendl;
+    p.second->put();
+    rq.pop_front();
+    count++;
+  }
+  if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
+  return count;
+}
+
 void ProtocolV2::reset_recv_state() {
   // clean read and write callbacks
   connection->pendingReadLen.reset();
@@ -190,15 +238,16 @@ CtPtr ProtocolV2::_fault() {
   requeue_sent();
 
   if (out_queue.empty() && state >= START_ACCEPT &&
-      state <= ACCEPTED_CLIENT_IDENT) {
+      state <= ACCEPTING_SESSION) {
     ldout(cct, 10) << __func__ << " with nothing to send and in the half "
                    << " accept state just closed" << dendl;
+    connection->write_lock.unlock();
     stop();
     connection->dispatch_queue->queue_reset(connection);
-    connection->write_lock.unlock();
     return nullptr;
   }
 
+  replacing = false;
   connection->fault();
   reset_recv_state();
 
@@ -297,7 +346,7 @@ void ProtocolV2::send_message(Message *m) {
     out_queue[m->get_priority()].emplace_back(std::move(bl), m);
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
-    if (can_write) {
+    if ((!replacing && can_write) || state == STANDBY) {
       connection->center->dispatch_event_external(connection->write_handler);
     }
   }
@@ -520,6 +569,9 @@ void ProtocolV2::write_event() {
     connection->write_lock.lock();
     if (state == STANDBY && !connection->policy.server && is_queued()) {
       ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
+      if (cookie) {  // only increment connect_seq if there is a session
+        connect_seq++;
+      }
       connection->_connect();
     } else if (connection->cs && state != NONE && state != CLOSED &&
                state != START_CONNECT) {
@@ -537,7 +589,9 @@ void ProtocolV2::write_event() {
   }
 }
 
-bool ProtocolV2::is_queued() { return false; }
+bool ProtocolV2::is_queued() {
+  return !out_queue.empty() || connection->is_queued();
+}
 
 CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
                        int len, char *buffer) {
@@ -650,7 +704,7 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
 
     ceph_assert(state == ACCEPTING);
     connection->policy = messenger->get_policy(peer_type);
-    ldout(cct, 10) << __func__ << " accept of host_type " << peer_type
+    ldout(cct, 10) << __func__ << " accept of host_type " << (int)peer_type
                    << ", policy.lossy=" << connection->policy.lossy
                    << " policy.server=" << connection->policy.server
                    << " policy.standby=" << connection->policy.standby
@@ -692,6 +746,17 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
 
   this->peer_required_features = peer_required_features;
 
+  if (cct->_conf->ms_inject_internal_delays &&
+      cct->_conf->ms_inject_socket_failures) {
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(cct, 10) << __func__ << " sleep for "
+                     << cct->_conf->ms_inject_internal_delays << dendl;
+      utime_t t;
+      t.set_from_double(cct->_conf->ms_inject_internal_delays);
+      t.sleep();
+    }
+  }
+
   CtPtr callback;
   callback = bannerExchangeCallback;
   bannerExchangeCallback = nullptr;
@@ -731,10 +796,18 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) {
     case Tag::AUTH_DONE:
     case Tag::IDENT:
     case Tag::IDENT_MISSING_FEATURES:
+    case Tag::SESSION_RECONNECT:
+    case Tag::SESSION_RETRY:
+    case Tag::SESSION_RETRY_GLOBAL:
+    case Tag::SESSION_RECONNECT_OK:
     case Tag::KEEPALIVE2:
     case Tag::KEEPALIVE2_ACK:
     case Tag::ACK:
       return READ(next_frame_len, handle_frame_payload);
+    case Tag::SESSION_RESET:
+      return handle_session_reset();
+    case Tag::WAIT:
+      return handle_wait();
     case Tag::MESSAGE:
       return handle_message();
   }
@@ -766,6 +839,14 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
       return handle_ident(buffer, next_frame_len);
     case Tag::IDENT_MISSING_FEATURES:
       return handle_ident_missing_features(buffer, next_frame_len);
+    case Tag::SESSION_RECONNECT:
+      return handle_reconnect(buffer, next_frame_len);
+    case Tag::SESSION_RETRY:
+      return handle_session_retry(buffer, next_frame_len);
+    case Tag::SESSION_RETRY_GLOBAL:
+      return handle_session_retry_global(buffer, next_frame_len);
+    case Tag::SESSION_RECONNECT_OK:
+      return handle_reconnect_ok(buffer, next_frame_len);
     case Tag::KEEPALIVE2:
       return handle_keepalive2(buffer, next_frame_len);
     case Tag::KEEPALIVE2_ACK:
@@ -857,8 +938,9 @@ CtPtr ProtocolV2::ready() {
     }
   }
 
-  state = READY;
+  connection->maybe_start_delay_thread();
 
+  state = READY;
   return CONTINUE(read_frame);
 }
 
@@ -1330,6 +1412,8 @@ CtPtr ProtocolV2::start_client_banner_exchange() {
   ldout(cct, 20) << __func__ << dendl;
   state = CONNECTING;
 
+  global_seq = messenger->get_global_seq();
+
   return _banner_exchange(CONTINUATION(post_client_banner_exchange));
 }
 
@@ -1409,7 +1493,13 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) {
   ldout(cct, 1) << __func__ << " authentication done,"
                 << " flags=" << auth_done.flags << dendl;
 
-  return send_client_ident();
+  if (!cookie) {
+    ceph_assert(connect_seq == 0);
+    return send_client_ident();
+  } else {  // reconnecting to previous session
+    ceph_assert(connect_seq > 0);
+    return send_reconnect();
+  }
 }
 
 CtPtr ProtocolV2::send_client_ident() {
@@ -1420,21 +1510,21 @@ CtPtr ProtocolV2::send_client_ident() {
     flags |= CEPH_MSG_CONNECT_LOSSY;
   }
 
-  cookie = ceph::util::generate_random_number<uint64_t>(0, -1ll);
-
-  IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(),
-                   connection->policy.features_supported,
-                   connection->policy.features_required, flags, cookie);
+  ClientIdentFrame client_ident(messenger->get_myaddrs(),
+                                messenger->get_myname().num(), global_seq,
+                                connection->policy.features_supported,
+                                connection->policy.features_required, flags);
 
   ldout(cct, 5) << __func__ << " sending identification: "
-                << "addrs: " << ident.addrs << " gid: " << ident.gid
-                << " features_supported: " << std::hex
-                << ident.supported_features
-                << " features_required: " << ident.required_features
-                << " flags: " << ident.flags << " cookie: " << std::dec
-                << ident.cookie << dendl;
-
-  bufferlist &bl = ident.get_buffer();
+                << "addrs=" << messenger->get_myaddrs()
+                << " gid=" << messenger->get_myname().num()
+                << " global_seq=" << global_seq
+                << " features_supported=" << std::hex
+                << connection->policy.features_supported
+                << " features_required=" << connection->policy.features_required
+                << " flags=" << flags << std::dec << dendl;
+
+  bufferlist &bl = client_ident.get_buffer();
   return WRITE(bl, handle_client_ident_write);
 }
 
@@ -1450,6 +1540,31 @@ CtPtr ProtocolV2::handle_client_ident_write(int r) {
   return CONTINUE(read_frame);
 }
 
+CtPtr ProtocolV2::send_reconnect() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  ReconnectFrame reconnect(messenger->get_myaddrs(), cookie, global_seq,
+                           connect_seq, in_seq);
+
+  ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie
+                << " gs=" << global_seq << " cs=" << connect_seq
+                << " ms=" << in_seq << dendl;
+  bufferlist &bl = reconnect.get_buffer();
+  return WRITE(bl, handle_reconnect_write);
+}
+
+CtPtr ProtocolV2::handle_reconnect_write(int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " reconnect write failed r=" << r << " ("
+                  << cpp_strerror(r) << ")" << dendl;
+    return _fault();
+  }
+
+  return CONTINUE(read_frame);
+}
+
 CtPtr ProtocolV2::handle_ident_missing_features(char *payload,
                                                 uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
@@ -1462,23 +1577,106 @@ CtPtr ProtocolV2::handle_ident_missing_features(char *payload,
   return _fault();
 }
 
+CtPtr ProtocolV2::handle_session_reset() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  ldout(cct, 1) << __func__ << " received session reset" << dendl;
+  reset_session();
+
+  return send_client_ident();
+}
+
+CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+  RetryFrame retry(payload, length);
+  connect_seq = retry.connect_seq + 1;
+
+  ldout(cct, 1) << __func__
+                << " received session retry connect_seq=" << retry.connect_seq
+                << ", inc to cs=" << connect_seq << dendl;
+
+  return send_reconnect();
+}
+
+CtPtr ProtocolV2::handle_session_retry_global(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+  RetryGlobalFrame retry(payload, length);
+  global_seq = messenger->get_global_seq(retry.global_seq);
+
+  ldout(cct, 1) << __func__ << " received session retry global global_seq="
+                << retry.global_seq << ", choose new gs=" << global_seq
+                << dendl;
+
+  return send_reconnect();
+}
+
+CtPtr ProtocolV2::handle_wait() {
+  ldout(cct, 20) << __func__ << dendl;
+  ldout(cct, 1) << __func__ << " received WAIT (connection race)" << dendl;
+  state = WAIT;
+  return _fault();
+}
+
+CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+  ReconnectOkFrame reconnect_ok(payload, length);
+  ldout(cct, 5) << __func__
+                << " reconnect accepted: sms=" << reconnect_ok.msg_seq << dendl;
+
+  out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq);
+
+  backoff = utime_t();
+  ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
+                 << ", lossy = " << connection->policy.lossy << ", features "
+                 << connection->get_features() << dendl;
+
+  if (connection->delay_state) {
+    ceph_assert(connection->delay_state->ready());
+  }
+
+  connection->dispatch_queue->queue_connect(connection);
+  messenger->ms_deliver_handle_fast_connect(connection);
+
+  return ready();
+}
+
 CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
 
-  IdentFrame server_ident(payload, length);
+  ServerIdentFrame server_ident(payload, length);
   ldout(cct, 5) << __func__ << " received server identification: "
-                << "addrs: " << server_ident.addrs
-                << " gid: " << server_ident.gid
-                << " features_supported" << std::hex
+                << "addrs=" << server_ident.addrs << " gid=" << server_ident.gid
+                << " global_seq=" << server_ident.global_seq
+                << " features_supported=" << std::hex
                 << server_ident.supported_features
-                << " features_required" << server_ident.required_features
-                << " flags: " << server_ident.flags << " cookie: " << std::dec
+                << " features_required=" << server_ident.required_features
+                << " flags=" << server_ident.flags << " cookie=" << std::dec
                 << server_ident.cookie << dendl;
 
+  cookie = server_ident.cookie;
+
   connection->set_peer_addrs(server_ident.addrs);
   connection->peer_global_id = server_ident.gid;
-  connection->set_features(server_ident.required_features &
+  connection->set_features(server_ident.supported_features &
                            connection->policy.features_supported);
+  peer_global_seq = server_ident.global_seq;
+
+  connection->policy.lossy = server_ident.flags & CEPH_MSG_CONNECT_LOSSY;
+
+  backoff = utime_t();
+  ldout(cct, 10) << __func__ << " connect success " << connect_seq
+                 << ", lossy = " << connection->policy.lossy << ", features "
+                 << connection->get_features() << dendl;
+
+  if (connection->delay_state) {
+    ceph_assert(connection->delay_state->ready());
+  }
+
+  connection->dispatch_queue->queue_connect(connection);
+  messenger->ms_deliver_handle_fast_connect(connection);
 
   return ready();
 }
@@ -1614,16 +1812,15 @@ CtPtr ProtocolV2::handle_auth_done_write(int r) {
 CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
   ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
 
-  IdentFrame client_ident(payload, length);
+  ClientIdentFrame client_ident(payload, length);
 
   ldout(cct, 5) << __func__ << " received client identification: "
-                << "addrs: " << client_ident.addrs
-                << " gid: " << client_ident.gid
-                << " features_supported" << std::hex
+                << "addrs=" << client_ident.addrs << " gid=" << client_ident.gid
+                << " global_seq=" << client_ident.global_seq
+                << " features_supported=" << std::hex
                 << client_ident.supported_features
-                << " features_required: " << client_ident.required_features
-                << " flags: " << client_ident.flags << " cookie: " << std::dec
-                << client_ident.cookie << dendl;
+                << " features_required=" << client_ident.required_features
+                << " flags=" << client_ident.flags << std::dec << dendl;
 
   if (client_ident.addrs.empty()) {
     connection->set_peer_addr(connection->target_addr);
@@ -1631,6 +1828,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
     // Should we check if one of the ident.addrs match connection->target_addr
     // as we do in ProtocolV1?
     connection->set_peer_addrs(client_ident.addrs);
+    connection->target_addr = client_ident.addrs.msgr2_addr();
   }
 
   uint64_t feat_missing = connection->policy.features_required &
@@ -1644,27 +1842,404 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
     return WRITE(bl, handle_ident_missing_features_write);
   }
 
-  state = ACCEPTED_CLIENT_IDENT;
+  connection_features =
+      client_ident.supported_features & connection->policy.features_supported;
+
+  state = ACCEPTING_SESSION;
+  peer_global_seq = client_ident.global_seq;
+
+  // Looks good so far, let's check if there is already an existing connection
+  // to this peer.
+
+  connection->lock.unlock();
+  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
+
+  connection->inject_delay();
+
+  connection->lock.lock();
+  if (state != ACCEPTING_SESSION) {
+    ldout(cct, 1) << __func__
+                  << " state changed while accept, it must be mark_down"
+                  << dendl;
+    ceph_assert(state == CLOSED);
+    return _fault();
+  }
+
+  if (existing) {
+    return handle_existing_connection(existing);
+  }
 
   // if everything is OK reply with server identification
-  connection->peer_global_id = client_ident.gid;
-  cookie = client_ident.cookie;
+  return send_server_ident();
+}
+
+CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r
+                  << " (" << cpp_strerror(r) << ")" << dendl;
+    return _fault();
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
+
+  ReconnectFrame reconnect(payload, length);
+
+  ldout(cct, 5) << __func__
+                << " received reconnect: cookie=" << reconnect.cookie
+                << " gs=" << reconnect.global_seq
+                << " cs=" << reconnect.connect_seq
+                << " ms=" << reconnect.msg_seq << dendl;
+
+  if (reconnect.addrs.empty()) {
+    connection->set_peer_addr(connection->target_addr);
+  } else {
+    // Should we check if one of the ident.addrs match connection->target_addr
+    // as we do in ProtocolV1?
+    connection->set_peer_addrs(reconnect.addrs);
+    connection->target_addr = reconnect.addrs.msgr2_addr();
+  }
+
+  connection->lock.unlock();
+  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
+
+  connection->inject_delay();
+
+  connection->lock.lock();
+  if (state != ACCEPTING) {
+    ldout(cct, 1) << __func__
+                  << " state changed while accept, it must be mark_down"
+                  << dendl;
+    ceph_assert(state == CLOSED);
+    return _fault();
+  }
+
+  ResetFrame reset;
+  bufferlist &bl = reset.get_buffer();
+
+  if (!existing) {
+    // there is no existing connection therefore cannot reconnect to previous
+    // session
+    ldout(cct, 0) << __func__
+                  << " no existing connection exists, reseting client" << dendl;
+    return WRITE(bl, handle_session_reset_write);
+  }
+
+  std::lock_guard<std::mutex> l(existing->lock);
+
+  ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
+  if (!exproto) {
+    ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
+    ceph_assert(false);
+  }
+
+  if (exproto->state == CLOSED) {
+    ldout(cct, 5) << __func__ << " existing " << existing
+                  << " already closed. Reseting client" << dendl;
+    return WRITE(bl, handle_session_reset_write);
+  }
+
+  if (exproto->replacing) {
+    ldout(cct, 1) << __func__
+                  << " existing racing replace happened while replacing."
+                  << " existing=" << existing << dendl;
+    RetryGlobalFrame retry(exproto->peer_global_seq);
+    bufferlist &bl = retry.get_buffer();
+    return WRITE(bl, handle_session_retry_write);
+  }
+
+  if (!exproto->cookie) {
+    // server connection was reseted, reset client
+    ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl;
+    return WRITE(bl, handle_session_reset_write);
+  } else if (exproto->cookie != reconnect.cookie) {
+    ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie
+                  << " cc=" << reconnect.cookie << ", reseting client" << dendl;
+    return WRITE(bl, handle_session_reset_write);
+  }
+
+  if (exproto->peer_global_seq > reconnect.global_seq) {
+    ldout(cct, 5) << __func__
+                  << " stale global_seq: sgs=" << exproto->peer_global_seq
+                  << " cgs=" << reconnect.global_seq
+                  << ", ask client to retry global" << dendl;
+    RetryGlobalFrame retry(exproto->peer_global_seq);
+    bufferlist &bl = retry.get_buffer();
+    return WRITE(bl, handle_session_retry_write);
+  }
+
+  if (exproto->connect_seq >= reconnect.connect_seq) {
+    ldout(cct, 5) << __func__
+                  << " stale connect_seq scs=" << exproto->connect_seq
+                  << " ccs=" << reconnect.connect_seq
+                  << " , ask client to retry" << dendl;
+    RetryFrame retry(exproto->connect_seq);
+    bufferlist &bl = retry.get_buffer();
+    return WRITE(bl, handle_session_retry_write);
+  }
+
+  // everything looks good
+  exproto->connect_seq = reconnect.connect_seq;
+  exproto->message_seq = reconnect.msg_seq;
+
+  return reuse_connection(existing, exproto, true);
+}
+
+CtPtr ProtocolV2::handle_session_reset_write(int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " session reset write failed r=" << r << " ("
+                  << cpp_strerror(r) << ")" << dendl;
+    return _fault();
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_session_retry_write(int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " session retry write failed r=" << r << " ("
+                  << cpp_strerror(r) << ")" << dendl;
+    return _fault();
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
+  ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
+
+  std::lock_guard<std::mutex> l(existing->lock);
+
+  ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
+  if (!exproto) {
+    ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
+    ceph_assert(false);
+  }
+
+  if (exproto->state == CLOSED) {
+    ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
+                  << dendl;
+    return send_server_ident();
+  }
+
+  if (exproto->replacing) {
+    ldout(cct, 1) << __func__
+                  << " existing racing replace happened while replacing."
+                  << " existing=" << existing << dendl;
+    WaitFrame wait;
+    bufferlist &bl = wait.get_buffer();
+    return WRITE(bl, handle_wait_write);
+  }
+
+  if (existing->policy.lossy) {
+    // existing connection can be thrown out in favor of this one
+    ldout(cct, 1)
+        << __func__
+        << " accept replacing existing (lossy) channel (new one lossy="
+        << connection->policy.lossy << ")" << dendl;
+    existing->protocol->stop();
+    existing->dispatch_queue->queue_reset(existing.get());
+    return send_server_ident();
+  }
+
+  if (exproto->cookie) {  // Found previous session
+    // peer has reseted and we're going to reuse the existing connection
+    // by replacing the communication socket
+    if (connection->policy.resetcheck) {
+      exproto->reset_session();
+    }
+    return reuse_connection(existing, exproto, false);
+  }
+
+  if (exproto->state == READY || exproto->state == STANDBY) {
+    ldout(cct, 10) << __func__
+                   << " existing connection is READY/STANDBY, lets reuse it"
+                   << dendl;
+    return reuse_connection(existing, exproto, false);
+  }
+
+  // Looks like a connection race: server and client are both connecting to
+  // each other at the same time.
+  if (connection->peer_addrs->msgr2_addr() <
+          messenger->get_myaddrs().msgr2_addr() ||
+      existing->policy.server) {
+    // this connection wins
+    ldout(cct, 10) << __func__
+                   << " connection race detected, replacing existing="
+                   << existing << " socket by this connection's socket"
+                   << dendl;
+    return reuse_connection(existing, exproto, false);
+  } else {
+    // the existing connection wins
+    ldout(cct, 10) << __func__
+                   << " connection race detected, this connection loses"
+                   << dendl;
+    ceph_assert(connection->peer_addrs->msgr2_addr() >
+                messenger->get_myaddrs().msgr2_addr());
+    WaitFrame wait;
+    bufferlist &bl = wait.get_buffer();
+    return WRITE(bl, handle_wait_write);
+  }
+}
+
+CtPtr ProtocolV2::handle_wait_write(int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " wait write failed r=" << r << " ("
+                  << cpp_strerror(r) << ")" << dendl;
+    return _fault();
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
+                                   ProtocolV2 *exproto, bool reconnect) {
+  ldout(cct, 20) << __func__ << " existing=" << existing
+                 << " reconnect=" << reconnect << dendl;
+
+  connection->inject_delay();
+
+  std::lock_guard<std::mutex> l(existing->write_lock);
+
+  connection->center->delete_file_event(connection->cs.fd(),
+                                        EVENT_READABLE | EVENT_WRITABLE);
+
+  if (existing->delay_state) {
+    existing->delay_state->flush();
+    ceph_assert(!connection->delay_state);
+  }
+  exproto->reset_recv_state();
+  if (!reconnect) {
+    exproto->peer_global_seq = peer_global_seq;
+    exproto->connection_features = connection_features;
+  }
+
+  auto temp_cs = std::move(connection->cs);
+  EventCenter *new_center = connection->center;
+  Worker *new_worker = connection->worker;
+  // avoid _stop shutdown replacing socket
+  // queue a reset on the new connection, which we're dumping for the old
+  stop();
+
+  connection->dispatch_queue->queue_reset(connection);
+  ldout(messenger->cct, 1) << __func__ << " stop myself to swap existing"
+                           << dendl;
+  exproto->can_write = false;
+  exproto->replacing = true;
+  existing->state_offset = 0;
+  // avoid previous thread modify event
+  exproto->state = NONE;
+  existing->state = AsyncConnection::STATE_NONE;
+  // Discard existing prefetch buffer in `recv_buf`
+  existing->recv_start = existing->recv_end = 0;
+  // there shouldn't exist any buffer
+  ceph_assert(connection->recv_start == connection->recv_end);
+
+  auto deactivate_existing = std::bind(
+      [existing, new_worker, new_center, exproto,
+       reconnect](ConnectedSocket &cs) mutable {
+        // we need to delete time event in original thread
+        {
+          std::lock_guard<std::mutex> l(existing->lock);
+          existing->write_lock.lock();
+          exproto->requeue_sent();
+          existing->outcoming_bl.clear();
+          existing->open_write = false;
+          existing->write_lock.unlock();
+          if (exproto->state == NONE) {
+            existing->shutdown_socket();
+            existing->cs = std::move(cs);
+            existing->worker->references--;
+            new_worker->references++;
+            existing->logger = new_worker->get_perf_counter();
+            existing->worker = new_worker;
+            existing->center = new_center;
+            if (existing->delay_state)
+              existing->delay_state->set_center(new_center);
+          } else if (exproto->state == CLOSED) {
+            auto back_to_close = std::bind(
+                [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
+            new_center->submit_to(new_center->get_id(),
+                                  std::move(back_to_close), true);
+            return;
+          } else {
+            ceph_abort();
+          }
+        }
+
+        // Before changing existing->center, it may already exists some
+        // events in existing->center's queue. Then if we mark down
+        // `existing`, it will execute in another thread and clean up
+        // connection. Previous event will result in segment fault
+        auto transfer_existing = [existing, exproto, reconnect]() mutable {
+          std::lock_guard<std::mutex> l(existing->lock);
+          if (exproto->state == CLOSED) return;
+          ceph_assert(exproto->state == NONE);
+
+          exproto->state = ACCEPTING_SESSION;
+          existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
+          existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
+                                              existing->read_handler);
+          if (!reconnect) {
+            CONTINUATION_RUN2(exproto, exproto->send_server_ident())
+          } else {
+            CONTINUATION_RUN2(exproto, exproto->send_reconnect_ok())
+          }
+        };
+        if (existing->center->in_thread())
+          transfer_existing();
+        else
+          existing->center->submit_to(existing->center->get_id(),
+                                      std::move(transfer_existing), true);
+      },
+      std::move(temp_cs));
+
+  existing->center->submit_to(existing->center->get_id(),
+                              std::move(deactivate_existing), true);
+  return nullptr;
+}
+
+CtPtr ProtocolV2::send_server_ident() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  // this is required for the case when this connection is being replaced
+  out_seq = discard_requeued_up_to(out_seq, 0);
+  in_seq = 0;
+
+  if (!connection->policy.lossy) {
+    cookie = ceph::util::generate_random_number<uint64_t>(0, -1ll);
+  }
 
   uint64_t flags = 0;
   if (connection->policy.lossy) {
     flags = flags | CEPH_MSG_CONNECT_LOSSY;
   }
-  IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(),
-                   connection->policy.features_supported,
-                   connection->policy.features_required, flags, cookie);
+
+  uint64_t gs = messenger->get_global_seq();
+  ServerIdentFrame server_ident(
+      messenger->get_myaddrs(), messenger->get_myname().num(), gs,
+      connection->policy.features_supported,
+      connection->policy.features_required, flags, cookie);
 
   ldout(cct, 5) << __func__ << " sending identification: "
-                << "addrs: " << ident.addrs << " gid: " << ident.gid
-                << " features_supported: " << std::hex
-                << ident.supported_features
-                << " features_required: " << ident.required_features
-                << " flags: " << ident.flags << " cookie: " << std::dec
-                << ident.cookie << dendl;
+                << "addrs=" << messenger->get_myaddrs()
+                << " gid=" << messenger->get_myname().num()
+                << " global_seq=" << gs << " features_supported=" << std::hex
+                << connection->policy.features_supported
+                << " features_required=" << connection->policy.features_required
+                << " flags=" << flags << " cookie=" << std::dec << cookie
+                << dendl;
 
   connection->lock.unlock();
   // Because "replacing" will prevent other connections preempt this addr,
@@ -1674,35 +2249,84 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
   connection->inject_delay();
 
   connection->lock.lock();
+  replacing = false;
 
   if (r < 0) {
     ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
-                  << connection->peer_addrs.msgr2_addr()
+                  << connection->peer_addrs->msgr2_addr()
                   << " just fail later one(this)" << dendl;
-    ldout(cct, 10) << "accept fault after register" << dendl;
     connection->inject_delay();
     return _fault();
   }
-  if (state != ACCEPTED_CLIENT_IDENT) {
+  if (state != ACCEPTING_SESSION) {
     ldout(cct, 1) << __func__
                   << " state changed while accept_conn, it must be mark_down"
                   << dendl;
     ceph_assert(state == CLOSED || state == NONE);
-    ldout(cct, 10) << "accept fault after register" << dendl;
     connection->inject_delay();
     return _fault();
   }
 
-  bufferlist &bl = ident.get_buffer();
-  return WRITE(bl, handle_send_server_ident_write);
+  connection->set_features(connection_features);
+
+  // notify
+  connection->dispatch_queue->queue_accept(connection);
+  messenger->ms_deliver_handle_fast_accept(connection);
+
+  bufferlist &bl = server_ident.get_buffer();
+  return WRITE(bl, handle_server_ident_write);
 }
 
-CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
+CtPtr ProtocolV2::handle_server_ident_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
-    ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r
-                  << " (" << cpp_strerror(r) << ")" << dendl;
+    ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " ("
+                  << cpp_strerror(r) << ")" << dendl;
+    connection->inject_delay();
+    return _fault();
+  }
+
+  if (connection->delay_state) {
+    ceph_assert(connection->delay_state->ready());
+  }
+
+  return ready();
+}
+
+CtPtr ProtocolV2::send_reconnect_ok() {
+  ldout(cct, 20) << __func__ << dendl;
+
+  out_seq = discard_requeued_up_to(out_seq, message_seq);
+
+  uint64_t ms = in_seq;
+  ReconnectOkFrame reconnect_ok(ms);
+
+  ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
+
+  connection->lock.unlock();
+  // Because "replacing" will prevent other connections preempt this addr,
+  // it's safe that here we don't acquire Connection's lock
+  ssize_t r = messenger->accept_conn(connection);
+
+  connection->inject_delay();
+
+  connection->lock.lock();
+  replacing = false;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
+                  << connection->peer_addrs->msgr2_addr()
+                  << " just fail later one(this)" << dendl;
+    connection->inject_delay();
+    return _fault();
+  }
+  if (state != ACCEPTING_SESSION) {
+    ldout(cct, 1) << __func__
+                  << " state changed while accept_conn, it must be mark_down"
+                  << dendl;
+    ceph_assert(state == CLOSED || state == NONE);
+    connection->inject_delay();
     return _fault();
   }
 
@@ -1710,17 +2334,23 @@ CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
   connection->dispatch_queue->queue_accept(connection);
   messenger->ms_deliver_handle_fast_accept(connection);
 
-  return CONTINUE(read_frame);
+  bufferlist &bl = reconnect_ok.get_buffer();
+  return WRITE(bl, handle_reconnect_ok_write);
 }
 
-CtPtr ProtocolV2::handle_send_server_ident_write(int r) {
+CtPtr ProtocolV2::handle_reconnect_ok_write(int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
-    ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " ("
+    ldout(cct, 1) << __func__ << " reconnect ok write failed r=" << r << " ("
                   << cpp_strerror(r) << ")" << dendl;
+    connection->inject_delay();
     return _fault();
   }
 
+  if (connection->delay_state) {
+    ceph_assert(connection->delay_state->ready());
+  }
+
   return ready();
 }
index 616239d4fd919b9f603251aad2104804da0b5c2b..bd8ff9e4191f0f4d1d3fc1aa281bf682bdaa4a42 100644 (file)
@@ -14,7 +14,7 @@ private:
     CONNECTING,
     START_ACCEPT,
     ACCEPTING,
-    ACCEPTED_CLIENT_IDENT,
+    ACCEPTING_SESSION,
     READY,
     THROTTLE_MESSAGE,
     THROTTLE_BYTES,
@@ -32,7 +32,7 @@ private:
                                       "CONNECTING",
                                       "START_ACCEPT",
                                       "ACCEPTING",
-                                      "ACCEPTED_CLIENT_IDENT",
+                                      "ACCEPTING_SESSION",
                                       "READY",
                                       "THROTTLE_MESSAGE",
                                       "THROTTLE_BYTES",
@@ -45,7 +45,6 @@ private:
     return statenames[state];
   }
 
-public:
   enum class Tag : uint32_t {
     AUTH_REQUEST,
     AUTH_BAD_METHOD,
@@ -54,6 +53,12 @@ public:
     AUTH_DONE,
     IDENT,
     IDENT_MISSING_FEATURES,
+    SESSION_RECONNECT,
+    SESSION_RESET,
+    SESSION_RETRY,
+    SESSION_RETRY_GLOBAL,
+    SESSION_RECONNECT_OK,
+    WAIT,
     MESSAGE,
     KEEPALIVE2,
     KEEPALIVE2_ACK,
@@ -93,12 +98,9 @@ public:
     bufferlist auth_payload;
 
     AuthRequestFrame(uint32_t method, bufferlist &auth_payload)
-        : Frame(Tag::AUTH_REQUEST),
-          method(method),
-          len(auth_payload.length()),
-          auth_payload(auth_payload) {
+        : Frame(Tag::AUTH_REQUEST) {
       encode(method, payload, 0);
-      encode(len, payload, 0);
+      encode(auth_payload.length(), payload, 0);
       payload.claim_append(auth_payload);
     }
 
@@ -115,12 +117,10 @@ public:
     std::vector<__u32> allowed_methods;
 
     AuthBadMethodFrame(uint32_t method, std::vector<__u32> methods)
-        : Frame(Tag::AUTH_BAD_METHOD),
-          method(method),
-          allowed_methods(methods) {
+        : Frame(Tag::AUTH_BAD_METHOD) {
       encode(method, payload, 0);
-      encode((uint32_t)allowed_methods.size(), payload, 0);
-      for (const auto &a_meth : allowed_methods) {
+      encode((uint32_t)methods.size(), payload, 0);
+      for (const auto &a_meth : methods) {
         encode(a_meth, payload, 0);
       }
     }
@@ -141,9 +141,7 @@ public:
     std::string error_msg;
 
     AuthBadAuthFrame(uint32_t error_code, std::string error_msg)
-        : Frame(Tag::AUTH_BAD_AUTH),
-          error_code(error_code),
-          error_msg(error_msg) {
+        : Frame(Tag::AUTH_BAD_AUTH) {
       encode(error_code, payload, 0);
       encode(error_msg, payload, 0);
     }
@@ -159,11 +157,8 @@ public:
     uint32_t len;
     bufferlist auth_payload;
 
-    AuthMoreFrame(bufferlist &auth_payload)
-        : Frame(Tag::AUTH_MORE),
-          len(auth_payload.length()),
-          auth_payload(auth_payload) {
-      encode(len, payload, 0);
+    AuthMoreFrame(bufferlist &auth_payload) : Frame(Tag::AUTH_MORE) {
+      encode(auth_payload.length(), payload, 0);
       payload.claim_append(auth_payload);
     }
 
@@ -177,7 +172,7 @@ public:
   struct AuthDoneFrame : public Frame {
     uint64_t flags;
 
-    AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE), flags(flags) {
+    AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE) {
       encode(flags, payload, 0);
     }
 
@@ -186,42 +181,165 @@ public:
     }
   };
 
-  struct IdentFrame : public SignedEncryptedFrame {
+  struct ClientIdentFrame : public SignedEncryptedFrame {
     entity_addrvec_t addrs;
     int64_t gid;
+    uint64_t global_seq;
     uint64_t supported_features;  // CEPH_FEATURE_*
     uint64_t required_features;   // CEPH_FEATURE_*
     uint64_t flags;               // CEPH_MSG_CONNECT_*
-    uint64_t cookie;
 
-    IdentFrame(entity_addrvec_t addrs, int64_t gid, uint64_t supported_features,
-               uint64_t required_features, uint64_t flags, uint64_t cookie)
-        : SignedEncryptedFrame(Tag::IDENT),
-          addrs(addrs),
-          gid(gid),
-          supported_features(supported_features),
-          required_features(required_features),
-          flags(flags),
-          cookie(cookie) {
+    ClientIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
+                     uint64_t global_seq, uint64_t supported_features,
+                     uint64_t required_features, uint64_t flags)
+        : SignedEncryptedFrame(Tag::IDENT) {
       encode(addrs, payload, -1ll);
       encode(gid, payload, -1ll);
+      encode(global_seq, payload, -1ll);
       encode(supported_features, payload, -1ll);
       encode(required_features, payload, -1ll);
       encode(flags, payload, -1ll);
+    }
+
+    ClientIdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+      bufferlist bl;
+      bl.push_back(buffer::create_static(length, payload));
+      try {
+        auto ti = bl.cbegin();
+        decode_frame(ti);
+      } catch (const buffer::error &e) {
+      }
+    }
+
+    ClientIdentFrame() : SignedEncryptedFrame() {}
+
+  protected:
+    void decode_frame(ceph::buffer::list::const_iterator &ti) {
+      decode(addrs, ti);
+      decode(gid, ti);
+      decode(global_seq, ti);
+      decode(supported_features, ti);
+      decode(required_features, ti);
+      decode(flags, ti);
+    }
+  };
+
+  struct ServerIdentFrame : public ClientIdentFrame {
+    uint64_t cookie;
+
+    ServerIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
+                     uint64_t global_seq, uint64_t supported_features,
+                     uint64_t required_features, uint64_t flags,
+                     uint64_t cookie)
+        : ClientIdentFrame(addrs, gid, global_seq, supported_features,
+                           required_features, flags) {
       encode(cookie, payload, -1ll);
     }
 
-    IdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+    ServerIdentFrame(char *payload, uint32_t length) : ClientIdentFrame() {
+      bufferlist bl;
+      bl.push_back(buffer::create_static(length, payload));
+      try {
+        auto ti = bl.cbegin();
+        ClientIdentFrame::decode_frame(ti);
+        decode(cookie, ti);
+      } catch (const buffer::error &e) {
+      }
+    }
+  };
+
+  struct ReconnectFrame : public SignedEncryptedFrame {
+    entity_addrvec_t addrs;
+    uint64_t cookie;
+    uint64_t global_seq;
+    uint64_t connect_seq;
+    uint64_t msg_seq;
+
+    ReconnectFrame(const entity_addrvec_t &addrs, uint64_t cookie,
+                   uint64_t global_seq, uint64_t connect_seq, uint64_t msg_seq)
+        : SignedEncryptedFrame(Tag::SESSION_RECONNECT) {
+      encode(addrs, payload, -1ll);
+      encode(cookie, payload, 0);
+      encode(global_seq, payload, 0);
+      encode(connect_seq, payload, 0);
+      encode(msg_seq, payload, 0);
+    }
+
+    ReconnectFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
       bufferlist bl;
       bl.push_back(buffer::create_static(length, payload));
       try {
         auto ti = bl.cbegin();
         decode(addrs, ti);
-        decode(gid, ti);
-        decode(supported_features, ti);
-        decode(required_features, ti);
-        decode(flags, ti);
         decode(cookie, ti);
+        decode(global_seq, ti);
+        decode(connect_seq, ti);
+        decode(msg_seq, ti);
+      } catch (const buffer::error &e) {
+      }
+    }
+  };
+
+  struct ResetFrame : public SignedEncryptedFrame {
+    ResetFrame() : SignedEncryptedFrame(Tag::SESSION_RESET) {}
+  };
+
+  struct RetryFrame : public SignedEncryptedFrame {
+    uint64_t connect_seq;
+
+    RetryFrame(uint64_t connect_seq)
+        : SignedEncryptedFrame(Tag::SESSION_RETRY) {
+      encode(connect_seq, payload);
+    }
+
+    RetryFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+      bufferlist bl;
+      bl.push_back(buffer::create_static(length, payload));
+      try {
+        auto ti = bl.cbegin();
+        decode(connect_seq, ti);
+      } catch (const buffer::error &e) {
+      }
+    }
+  };
+
+  struct RetryGlobalFrame : public SignedEncryptedFrame {
+    uint64_t global_seq;
+
+    RetryGlobalFrame(uint64_t global_seq)
+        : SignedEncryptedFrame(Tag::SESSION_RETRY_GLOBAL) {
+      encode(global_seq, payload);
+    }
+
+    RetryGlobalFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+      bufferlist bl;
+      bl.push_back(buffer::create_static(length, payload));
+      try {
+        auto ti = bl.cbegin();
+        decode(global_seq, ti);
+      } catch (const buffer::error &e) {
+      }
+    }
+  };
+
+  struct WaitFrame : public SignedEncryptedFrame {
+    WaitFrame() : SignedEncryptedFrame(Tag::WAIT) {}
+  };
+
+  struct ReconnectOkFrame : public SignedEncryptedFrame {
+    uint64_t msg_seq;
+
+    ReconnectOkFrame(uint64_t msg_seq)
+        : SignedEncryptedFrame(Tag::SESSION_RECONNECT_OK) {
+      encode(msg_seq, payload, 0);
+    }
+
+    ReconnectOkFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+      bufferlist bl;
+      bl.push_back(buffer::create_static(length, payload));
+      try {
+        auto ti = bl.cbegin();
+        decode(msg_seq, ti);
       } catch (const buffer::error &e) {
       }
     }
@@ -317,12 +435,16 @@ public:
   char *temp_buffer;
   State state;
   uint64_t peer_required_features;
+  uint64_t connection_features;
   uint64_t cookie;
+  uint64_t global_seq;
+  uint64_t connect_seq;
+  uint64_t peer_global_seq;
   uint64_t message_seq;
+  bool replacing;
   bool can_write;
   std::map<int, std::list<std::pair<bufferlist, Message *>>> out_queue;
   std::list<Message *> sent;
-  __u32 connect_seq;
   std::atomic<uint64_t> out_seq{0};
   std::atomic<uint64_t> in_seq{0};
   std::atomic<uint64_t> ack_left{0};
@@ -351,9 +473,11 @@ public:
                         bufferlist &bl);
 
   void requeue_sent();
+  uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
   void reset_recv_state();
   Ct<ProtocolV2> *_fault();
   void discard_out_queue();
+  void reset_session();
   void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
   Message *_get_next_outgoing(bufferlist *bl);
   ssize_t write_message(Message *m, bufferlist &bl, bool more);
@@ -434,6 +558,7 @@ private:
   CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
   WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_request_write);
   WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_client_ident_write);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_write);
 
   Ct<ProtocolV2> *start_client_banner_exchange();
   Ct<ProtocolV2> *post_client_banner_exchange();
@@ -444,7 +569,14 @@ private:
   Ct<ProtocolV2> *handle_auth_done(char *payload, uint32_t length);
   Ct<ProtocolV2> *send_client_ident();
   Ct<ProtocolV2> *handle_client_ident_write(int r);
+  Ct<ProtocolV2> *send_reconnect();
+  Ct<ProtocolV2> *handle_reconnect_write(int r);
   Ct<ProtocolV2> *handle_ident_missing_features(char *payload, uint32_t length);
+  Ct<ProtocolV2> *handle_session_reset();
+  Ct<ProtocolV2> *handle_session_retry(char *payload, uint32_t length);
+  Ct<ProtocolV2> *handle_session_retry_global(char *payload, uint32_t length);
+  Ct<ProtocolV2> *handle_wait();
+  Ct<ProtocolV2> *handle_reconnect_ok(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_server_ident(char *payload, uint32_t length);
 
   // Server Protocol
@@ -455,7 +587,11 @@ private:
   WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_done_write);
   WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2,
                                   handle_ident_missing_features_write);
-  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_send_server_ident_write);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_reset_write);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_retry_write);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_wait_write);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_server_ident_write);
+  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_ok_write);
 
   Ct<ProtocolV2> *start_server_banner_exchange();
   Ct<ProtocolV2> *post_server_banner_exchange();
@@ -465,7 +601,17 @@ private:
   Ct<ProtocolV2> *handle_auth_done_write(int r);
   Ct<ProtocolV2> *handle_client_ident(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
-  Ct<ProtocolV2> *handle_send_server_ident_write(int r);
+  Ct<ProtocolV2> *handle_reconnect(char *payload, uint32_t length);
+  Ct<ProtocolV2> *handle_session_reset_write(int r);
+  Ct<ProtocolV2> *handle_session_retry_write(int r);
+  Ct<ProtocolV2> *handle_existing_connection(AsyncConnectionRef existing);
+  Ct<ProtocolV2> *handle_wait_write(int r);
+  Ct<ProtocolV2> *reuse_connection(AsyncConnectionRef existing,
+                                   ProtocolV2 *exproto, bool reconnect);
+  Ct<ProtocolV2> *send_server_ident();
+  Ct<ProtocolV2> *handle_server_ident_write(int r);
+  Ct<ProtocolV2> *send_reconnect_ok();
+  Ct<ProtocolV2> *handle_reconnect_ok_write(int r);
 };
 
 #endif /* _MSG_ASYNC_PROTOCOL_V2_ */