]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: protocolv2 handshake frame exchanges
authorYingxin Cheng <yingxincheng@gmail.com>
Wed, 13 Mar 2019 14:48:12 +0000 (22:48 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 03:21:19 +0000 (11:21 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/Connection.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index d69d32ab0a10528d0ad3c640a55d47b82cc8bf70..e644447297c2922090c7e18df648b512d2438092 100644 (file)
@@ -28,6 +28,7 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
  protected:
   entity_addr_t peer_addr;
   peer_type_t peer_type = -1;
+  int64_t peer_id = -1;
 
  public:
   uint64_t peer_global_id = 0;
index 505a67f8b3b5c05efbcb49101ab5cf8d4d875c33..7a12511b1fd85ab22a79d7f3b37289d39e607de5 100644 (file)
@@ -4,6 +4,7 @@
 #include "ProtocolV2.h"
 
 #include "include/msgr.h"
+#include "include/random.h"
 
 #include "crimson/auth/AuthClient.h"
 #include "crimson/auth/AuthServer.h"
@@ -323,6 +324,33 @@ void ProtocolV2::dispatch_reset()
   });
 }
 
+void ProtocolV2::reset_session(bool full)
+{
+  if (full) {
+    server_cookie = 0;
+    connect_seq = 0;
+    conn.in_seq = 0;
+  } else {
+    conn.out_seq = 0;
+    conn.in_seq = 0;
+    client_cookie = 0;
+    server_cookie = 0;
+    connect_seq = 0;
+    peer_global_seq = 0;
+    // TODO:
+    // discard_out_queue();
+    // message_seq = 0;
+    // ack_left = 0;
+    seastar::with_gate(pending_dispatch, [this] {
+      return dispatcher.ms_handle_remote_reset(
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+      .handle_exception([this] (std::exception_ptr eptr) {
+        logger().error("{} ms_handle_remote_reset caust exception: {}", conn, eptr);
+      });
+    });
+  }
+}
+
 seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
 {
   // 1. prepare and send banner
@@ -522,62 +550,205 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
 
 seastar::future<bool> ProtocolV2::process_wait()
 {
-//return read_frame_payload()
-//.then([this] (bufferlist payload) {
-//  handle_wait() logic
-//  return false;
-//});
-  return seastar::make_ready_future<bool>(false);
+  return read_frame_payload()
+  .then([this] {
+    // handle_wait() logic
+    logger().debug("{} received WAIT (connection race)", conn);
+    WaitFrame::Decode(rx_segments_data.back());
+    return false;
+  });
 }
 
 seastar::future<bool> ProtocolV2::client_connect()
 {
   // send_client_ident() logic
-  // <prepare and send ClientIdentFrame>
-  // return read_main_preamble()
-  // .then([this] (Tag tag) {
-  //   switch (tag) {
-  //     case Tag::IDENT_MISSING_FEATURES:
-  //       abort_in_fault();
-  //     case Tag::WAIT:
-  //       return process_wait();
-  //     case Tag::SERVER_IDENT:
-  //       <handle ServerIdentFrame>
-           return seastar::make_ready_future<bool>(true);
-  //     default: {
-  //       unexpected_tag(tag, conn, "post_client_connect");
-  //     }
-  //   }
-  // });
+  if (!conn.policy.lossy && !client_cookie) {
+    client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+  }
+
+  // TODO: get socket address and learn(not supported by seastar)
+  entity_addr_t a;
+  a.u.sa.sa_family = AF_INET;
+  a.set_type(entity_addr_t::TYPE_MSGR2);
+  logger().debug("{} learn from addr {}", conn, a);
+  return messenger.learned_addr(a)
+  .then([this] {
+    uint64_t flags = 0;
+    if (conn.policy.lossy) {
+      flags |= CEPH_MSG_CONNECT_LOSSY;
+    }
+
+    auto client_ident = ClientIdentFrame::Encode(
+        messenger.get_myaddrs(),
+        conn.target_addr,
+        messenger.get_myname().num(),
+        global_seq,
+        conn.policy.features_supported,
+        conn.policy.features_required | msgr2_required, flags,
+        client_cookie);
+
+    logger().debug("{} sending identification: addrs={} target={} gid={}"
+                   " global_seq={} features_supported={} features_required={}"
+                   " flags={} cookie={}",
+                   conn, messenger.get_myaddrs(), conn.target_addr,
+                   messenger.get_myname().num(), global_seq,
+                   conn.policy.features_supported,
+                   conn.policy.features_required | msgr2_required,
+                   flags, client_cookie);
+    return write_frame(client_ident);
+  }).then([this] {
+    return read_main_preamble();
+  }).then([this] (Tag tag) {
+    switch (tag) {
+      case Tag::IDENT_MISSING_FEATURES:
+        return read_frame_payload()
+        .then([this] {
+          // handle_ident_missing_features() logic
+          auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+          logger().error("{} client does not support all server features: {}",
+                         conn, ident_missing.features());
+          abort_in_fault();
+          // won't be executed
+          return false;
+        });
+      case Tag::WAIT:
+        return process_wait();
+      case Tag::SERVER_IDENT:
+        return read_frame_payload()
+        .then([this] {
+          // handle_server_ident() logic
+          auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+          logger().debug("{} received server identification:"
+                         " addrs={} gid={} global_seq={}"
+                         " features_supported={} features_required={}"
+                         " flags={} cookie={}",
+                         conn,
+                         server_ident.addrs(), server_ident.gid(),
+                         server_ident.global_seq(),
+                         server_ident.supported_features(),
+                         server_ident.required_features(),
+                         server_ident.flags(), server_ident.cookie());
+
+          // is this who we intended to talk to?
+          // be a bit forgiving here, since we may be connecting based on addresses parsed out
+          // of mon_host or something.
+          if (!server_ident.addrs().contains(conn.target_addr)) {
+            logger().warn("{} peer identifies as {}, does not include {}",
+                          conn, server_ident.addrs(), conn.target_addr);
+            abort_in_fault();
+          }
+
+          server_cookie = server_ident.cookie();
+
+          // TODO: change peer_addr to entity_addrvec_t
+          ceph_assert(conn.peer_addr == server_ident.addrs().front());
+          peer_name = entity_name_t(conn.get_peer_type(), server_ident.gid());
+          conn.set_features(server_ident.supported_features() &
+                            conn.policy.features_supported);
+          peer_global_seq = server_ident.global_seq();
+
+          // TODO: lossless policy
+          ceph_assert(server_ident.flags() & CEPH_MSG_CONNECT_LOSSY);
+          conn.policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+          // TODO: backoff = utime_t();
+          logger().debug("{} connect success {}, lossy={}, features={}",
+                         conn, connect_seq, conn.policy.lossy, conn.features);
+
+          return dispatcher.ms_handle_connect(
+              seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+          .handle_exception([this] (std::exception_ptr eptr) {
+            logger().error("{} ms_handle_connect caust exception: {}", conn, eptr);
+          });
+        }).then([this] {
+          return true;
+        });
+      default: {
+        unexpected_tag(tag, conn, "post_client_connect");
+        // won't be executed
+        return seastar::make_ready_future<bool>(false);
+      }
+    }
+  });
 }
 
 seastar::future<bool> ProtocolV2::client_reconnect()
 {
   // send_reconnect() logic
-  // <prepare ReconnectFrame and send>
-  // <then:>
-  //   return read_main_preamble()
-  //   .then([this] (Tag tag) {
-  //     switch (tag) {
-  //       case Tag::SESSION_RETRY_GLOBAL:
-  //         <handle RetryGlobalFrame>
-  //         return client_reconnect();
-  //       case Tag::SESSION_RETRY:
-  //         <handle RetryFrame>
-  //         return client_reconnect();
-  //       case Tag::SESSION_RESET:
-  //         <handle ResetFrame>
-  //         return client_connect();
-  //       case Tag::WAIT:
-  //         return process_wait();
-  //       case Tag::SESSION_RECONNECT_OK:
-  //         <handle ReconnectOkFrame>
-             return seastar::make_ready_future<bool>(true);
-  //       default: {
-  //         unexpected_tag(tag, conn, "post_client_reconnect");
-  //       }
-  //     }
-  //   });
+  auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
+                                          client_cookie,
+                                          server_cookie,
+                                          global_seq,
+                                          connect_seq,
+                                          conn.in_seq);
+  logger().debug("{} reconnect to session: client_cookie={}"
+                 " server_cookie={} gs={} cs={} ms={}",
+                 conn, client_cookie, server_cookie,
+                 global_seq, connect_seq, conn.in_seq);
+  return write_frame(reconnect)
+  .then([this] {
+    return read_main_preamble();
+  }).then([this] (Tag tag) {
+    switch (tag) {
+      case Tag::SESSION_RETRY_GLOBAL:
+        return read_frame_payload()
+        .then([this] {
+          // handle_session_retry_global() logic
+          auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+          global_seq = messenger.get_global_seq(retry.global_seq());
+          logger().warn("{} received session retry global "
+                        "global_seq={}, choose new gs={}",
+                        conn, retry.global_seq(), global_seq);
+          return client_reconnect();
+        });
+      case Tag::SESSION_RETRY:
+        return read_frame_payload()
+        .then([this] {
+          // handle_session_retry() logic
+          auto retry = RetryFrame::Decode(rx_segments_data.back());
+          connect_seq = retry.connect_seq() + 1;
+          logger().warn("{} received session retry connect_seq={}, inc to cs={}",
+                        conn, retry.connect_seq(), connect_seq);
+          return client_reconnect();
+        });
+      case Tag::SESSION_RESET:
+        return read_frame_payload()
+        .then([this] {
+          // handle_session_reset() logic
+          auto reset = ResetFrame::Decode(rx_segments_data.back());
+          logger().warn("{} received session reset full={}", reset.full());
+          reset_session(reset.full());
+          return client_connect();
+        });
+      case Tag::WAIT:
+        return process_wait();
+      case Tag::SESSION_RECONNECT_OK:
+        return read_frame_payload()
+        .then([this] {
+          // handle_reconnect_ok() logic
+          auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+          logger().debug("{} received reconnect ok:"
+                         "sms={}, lossy={}, features={}",
+                         conn, reconnect_ok.msg_seq(),
+                         connect_seq, conn.policy.lossy, conn.features);
+          // TODO
+          // discard_requeued_up_to()
+          // backoff = utime_t();
+          return dispatcher.ms_handle_connect(
+              seastar::static_pointer_cast<SocketConnection>(
+                conn.shared_from_this()))
+          .handle_exception([this] (std::exception_ptr eptr) {
+            logger().error("{} ms_handle_connect caust exception: {}", conn, eptr);
+          });
+        }).then([this] {
+          return true;
+        });
+      default: {
+        unexpected_tag(tag, conn, "post_client_reconnect");
+        // won't be executed
+        return seastar::make_ready_future<bool>(false);
+      }
+    }
+  });
 }
 
 void ProtocolV2::execute_connecting()
@@ -609,9 +780,11 @@ void ProtocolV2::execute_connecting()
         }).then([this] {
           return client_auth();
         }).then([this] {
-          if (1) { // TODO check connect or reconnect
+          if (!server_cookie) {
+            ceph_assert(connect_seq == 0);
             return client_connect();
           } else {
+            ceph_assert(connect_seq > 0);
             // TODO: lossless policy
             ceph_assert(false);
             return client_reconnect();
@@ -725,85 +898,286 @@ seastar::future<> ProtocolV2::server_auth()
 
 seastar::future<bool> ProtocolV2::send_wait()
 {
-  // <prepare and send WaitFrame>
-  // <then:>
-       return seastar::make_ready_future<bool>(false);
+  auto wait = WaitFrame::Encode();
+  return write_frame(wait)
+  .then([this] {
+    return false;
+  });
 }
 
 seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
 {
+  // handle_existing_connection() logic
+  logger().debug("{} {}: {}", conn, __func__, *existing);
+
+  ProtocolV2 *exproto = dynamic_cast<ProtocolV2*>(existing->protocol.get());
+  ceph_assert(exproto);
+
+  if (exproto->state == state_t::CLOSING) {
+    logger().warn("{} existing {} already closed.", conn, *existing);
+    return send_server_ident()
+    .then([this] {
+      return true;
+    });
+  }
+
+  if (exproto->state == state_t::REPLACING) {
+    logger().warn("{} existing racing replace happened while replacing: {}",
+                  conn, *existing);
+    return send_wait();
+  }
+
+  if (exproto->peer_global_seq > peer_global_seq) {
+    logger().warn("{} this is a stale connection, peer_global_seq={}"
+                  "existing->peer_global_seq={} close this connection",
+                  conn, peer_global_seq, exproto->peer_global_seq);
+    dispatch_reset();
+    abort_in_close();
+  }
+
+  if (existing->policy.lossy) {
+    // existing connection can be thrown out in favor of this one
+    logger().warn("{} existing={} is a lossy channel. Close existing in favor of"
+                  " this connection", conn, *existing);
+    exproto->dispatch_reset();
+    exproto->close();
+    return send_server_ident()
+    .then([this] {
+      return true;
+    });
+  }
+
   // TODO: lossless policy
   ceph_assert(false);
 }
 
 seastar::future<bool> ProtocolV2::server_connect()
 {
-  // handle_client_ident() logic
-  // <process ClientIdentFrame>
-  // <case feature missing:>
-  //  <prepare and send IdentMissingFeaturesFrame>
-  //  <then: trigger SERVER_WAIT>
-  //    return seastar::make_ready_future<bool>(false);
-  // <case existing:>
-  //  return handle_existing_connection(existing);
-  // <case everything OK:>
-      return send_server_ident()
+  return read_frame_payload()
+  .then([this] {
+    // handle_client_ident() logic
+    auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+    logger().debug("{} received client identification: addrs={} target={}"
+                   " gid={} global_seq={} features_supported={}"
+                   " features_required={} flags={} cookie={}",
+                   conn, client_ident.addrs(), client_ident.target_addr(),
+                   client_ident.gid(), client_ident.global_seq(),
+                   client_ident.supported_features(),
+                   client_ident.required_features(),
+                   client_ident.flags(), client_ident.cookie());
+
+    if (client_ident.addrs().empty() ||
+        client_ident.addrs().front() == entity_addr_t()) {
+      logger().error("{} oops, client_ident.addrs() is empty", conn);
+      abort_in_fault();
+    }
+    if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
+      logger().error("{} peer is trying to reach {} which is not us ({})",
+                     conn, client_ident.target_addr(), messenger.get_myaddrs());
+      abort_in_fault();
+    }
+    // TODO: change peer_addr to entity_addrvec_t
+    entity_addr_t paddr = client_ident.addrs().front();
+    conn.peer_addr = conn.target_addr;
+    conn.peer_addr.set_type(paddr.get_type());
+    conn.peer_addr.set_port(paddr.get_port());
+    conn.peer_addr.set_nonce(paddr.get_nonce());
+    logger().debug("{} got paddr={}, conn.peer_addr={}", conn, paddr, conn.peer_addr);
+    conn.target_addr = conn.peer_addr;
+
+    peer_name = entity_name_t(conn.get_peer_type(), client_ident.gid());
+    conn.peer_id = client_ident.gid();
+    client_cookie = client_ident.cookie();
+
+    uint64_t feat_missing =
+      (conn.policy.features_required | msgr2_required) &
+      ~(uint64_t)client_ident.supported_features();
+    if (feat_missing) {
+      logger().warn("{} peer missing required features {}", conn, feat_missing);
+      auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+      return write_frame(ident_missing_features)
       .then([this] {
-        // goto ready
-        return true;
+        return false;
       });
+    }
+    connection_features =
+        client_ident.supported_features() & conn.policy.features_supported;
+
+    peer_global_seq = client_ident.global_seq();
+
+    // Looks good so far, let's check if there is already an existing connection
+    // to this peer.
+
+    SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr);
+
+    if (existing) {
+      if (existing->protocol->proto_type != 2) {
+        logger().warn("{} existing {} proto version is {}, close",
+                      conn, *existing, existing->protocol->proto_type);
+        // should unregister the existing from msgr atomically
+        existing->close();
+      } else {
+        return handle_existing_connection(existing);
+      }
+    }
+
+    // if everything is OK reply with server identification
+    return send_server_ident()
+    .then([this] {
+      // goto ready
+      return true;
+    });
+  });
 }
 
 seastar::future<bool> ProtocolV2::read_reconnect()
 {
-  // return read_main_preamble()
-  // .then([this] (Tag tag) {
-  //   expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
-       return server_reconnect();
-  // });
+  return read_main_preamble()
+  .then([this] (Tag tag) {
+    expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+    return server_reconnect();
+  });
 }
 
 seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
 {
-  // <prepare and send RetryFrame>
-  // <then:>
-       return read_reconnect();
+  auto retry = RetryFrame::Encode(connect_seq);
+  return write_frame(retry)
+  .then([this] {
+    return read_reconnect();
+  });
 }
 
 seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
 {
-  // <prepare and send RetryGlobalFrame>
-  // <then:>
-       return read_reconnect();
+  auto retry = RetryGlobalFrame::Encode(global_seq);
+  return write_frame(retry)
+  .then([this] {
+    return read_reconnect();
+  });
 }
 
 seastar::future<bool> ProtocolV2::send_reset(bool full)
 {
-  // <prepare and send ResetFrame>
-  // <then:>
-  //   return read_main_preamble()
-  //   .then([this] (Tag tag) {
-  //     expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
-         return server_connect();
-  //   });
+  auto reset = ResetFrame::Encode(full);
+  return write_frame(reset)
+  .then([this] {
+    return read_main_preamble();
+  }).then([this] (Tag tag) {
+    expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+    return server_connect();
+  });
 }
 
 seastar::future<bool> ProtocolV2::server_reconnect()
 {
-  // handle_reconnect() logic
-  // <process ReconnectFrame>
-  // <case no existing:>
-       return send_reset(0);
-  // <retry global cases:>
-  //   return send_retry_global();
-  // <case wait:>
-  //   return send_wait();
-  // <case retry:>
-  //   return send_retry();
-  // <other reset cases:>
-  //   return send_reset();
-  // TODO: lossless policy
-  //   return reuse_connection(existing, exproto);
+  return read_frame_payload()
+  .then([this] {
+    // handle_reconnect() logic
+    auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+
+    logger().debug("{} received reconnect: client_cookie={} server_cookie={}"
+                   " gs={} cs={} ms={}",
+                   conn, reconnect.client_cookie(), reconnect.server_cookie(),
+                   reconnect.global_seq(), reconnect.connect_seq(),
+                   reconnect.msg_seq());
+
+    // can peer_addrs be changed on-the-fly?
+    if (conn.peer_addr != reconnect.addrs().front()) {
+      logger().error("{} peer identifies as {}, while conn.peer_addr={}",
+                     conn, reconnect.addrs().front(), conn.peer_addr);
+      ceph_assert(false);
+    }
+    // TODO: change peer_addr to entity_addrvec_t
+    ceph_assert(conn.peer_addr == conn.target_addr);
+    peer_global_seq = reconnect.global_seq();
+
+    SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr);
+
+    if (!existing) {
+      // there is no existing connection therefore cannot reconnect to previous
+      // session
+      logger().error("{} server_reconnect: no existing connection,"
+                     " reseting client", conn);
+      return send_reset(true);
+    }
+
+    if (existing->protocol->proto_type != 2) {
+      logger().warn("{} server_reconnect: existing {} proto version is {},"
+                    "close existing and resetting client.", conn, *existing);
+      existing->close();
+      return send_reset(true);
+    }
+
+    ProtocolV2 *exproto = dynamic_cast<ProtocolV2*>(existing->protocol.get());
+    ceph_assert(exproto);
+
+    if (exproto->state == state_t::REPLACING) {
+      logger().warn("{} server_reconnect: existing racing replace happened while "
+                    " replacing, retry_global. existing={}", conn, *existing);
+      return send_retry_global(exproto->peer_global_seq);
+    }
+
+    if (exproto->client_cookie != reconnect.client_cookie()) {
+      logger().warn("{} server_reconnect: existing={} client cookie mismatch,"
+                    " I must have reseted: cc={} rcc={}, reseting client.",
+                    conn, *existing, exproto->client_cookie, reconnect.client_cookie());
+      return send_reset(conn.policy.resetcheck);
+    } else if (exproto->server_cookie == 0) {
+      // this happens when:
+      //   - a connects to b
+      //   - a sends client_ident
+      //   - b gets client_ident, sends server_ident and sets cookie X
+      //   - connection fault
+      //   - b reconnects to a with cookie X, connect_seq=1
+      //   - a has cookie==0
+      logger().warn("{} server_reconnect: I was a client and didn't received the"
+                    " server_ident. Asking peer to resume session establishment",
+                    conn);
+      return send_reset(false);
+    }
+
+    if (exproto->peer_global_seq > reconnect.global_seq()) {
+      logger().debug("{} server_reconnect: stale global_seq: sgs={} cgs={},"
+                     " ask client to retry global",
+                     conn, exproto->peer_global_seq, reconnect.global_seq());
+      return send_retry_global(exproto->peer_global_seq);
+    }
+
+    if (exproto->connect_seq > reconnect.connect_seq()) {
+      logger().debug("{} server_reconnect: stale connect_seq scs={} ccs={},"
+                     " ask client to retry",
+                     conn, exproto->connect_seq, reconnect.connect_seq());
+      return send_retry(exproto->connect_seq);
+    }
+
+    if (exproto->connect_seq == reconnect.connect_seq()) {
+      // reconnect race: both peers are sending reconnect messages
+      if (existing->peer_addr > messenger.get_myaddrs().msgr2_addr() &&
+          !existing->policy.server) {
+        // the existing connection wins
+        logger().warn("{} server_reconnect: reconnect race detected,"
+                      " this connection loses to existing={},"
+                      " ask client to wait", conn, *existing);
+        return send_wait();
+      } else {
+        // this connection wins
+        logger().warn("{} server_reconnect: reconnect race detected,"
+                      " replacing existing={} socket by this connection's socket",
+                      conn, *existing);
+      }
+    }
+
+    logger().warn("{} server_reconnect: reconnect to exsiting={}", conn, *existing);
+
+    // everything looks good
+    exproto->connect_seq = reconnect.connect_seq();
+    //exproto->message_seq = reconnect.msg_seq();
+
+    // TODO: lossless policy
+    // return reuse_connection(existing, exproto);
+    ceph_assert(false);
+  });
 }
 
 void ProtocolV2::execute_accepting()
@@ -825,17 +1199,21 @@ void ProtocolV2::execute_accepting()
                          conn.policy.standby, conn.policy.resetcheck);
           return server_auth();
         }).then([this] {
-      //  return read_main_preamble()
-      //}).then([this] (Tag tag) {
-      //  switch (tag) {
-      //    case Tag::CLIENT_IDENT:
+          return read_main_preamble();
+        }).then([this] (Tag tag) {
+          switch (tag) {
+            case Tag::CLIENT_IDENT:
               return server_connect();
-      //    case Tag::SESSION_RECONNECT:
-      //      return server_reconnect();
-      //    default: {
-      //      unexpected_tag(tag, conn, "post_server_auth");
-      //    }
-      //  }
+            case Tag::SESSION_RECONNECT:
+              // TODO: lossless policy
+              ceph_assert(false);
+              return server_reconnect();
+            default: {
+              unexpected_tag(tag, conn, "post_server_auth");
+              // won't be executed
+              return seastar::make_ready_future<bool>(false);
+            }
+          }
         }).then([this] (bool proceed_or_wait) {
           if (proceed_or_wait) {
             messenger.register_conn(
@@ -896,9 +1274,52 @@ seastar::future<> ProtocolV2::finish_auth()
 seastar::future<> ProtocolV2::send_server_ident()
 {
   // send_server_ident() logic
-  // <prepare and send ServerIdentFrame>
 
-  return seastar::now();
+  // this is required for the case when this connection is being replaced
+  // TODO
+  // out_seq = discard_requeued_up_to(out_seq, 0);
+  conn.in_seq = 0;
+
+  if (!conn.policy.lossy) {
+    server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+  }
+
+  uint64_t flags = 0;
+  if (conn.policy.lossy) {
+    flags = flags | CEPH_MSG_CONNECT_LOSSY;
+  }
+
+  // refered to async-conn v2: not assign gs to global_seq
+  uint64_t gs = messenger.get_global_seq();
+  auto server_ident = ServerIdentFrame::Encode(
+          messenger.get_myaddrs(),
+          messenger.get_myname().num(),
+          gs,
+          conn.policy.features_supported,
+          conn.policy.features_required | msgr2_required,
+          flags,
+          server_cookie);
+
+  logger().debug("{} sending server identification: addrs={} gid={}"
+                 " global_seq={} features_supported={} features_required={}"
+                 " flags={} cookie={}",
+                 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+                 gs, conn.policy.features_supported,
+                 conn.policy.features_required | msgr2_required,
+                 flags, server_cookie);
+
+  conn.set_features(connection_features);
+
+  // notify
+  seastar::with_gate(pending_dispatch, [this] {
+    return dispatcher.ms_handle_accept(
+        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+    .handle_exception([this] (std::exception_ptr eptr) {
+      logger().error("{} ms_handle_accept caust exception: {}", conn, eptr);
+    });
+  });
+
+  return write_frame(server_ident);
 }
 
 // REPLACING state
index 75a49c6cd30ae75d09a2ef2d21b0a3e04fe4016b..08f935470fd788eda6d0ec1f78ab76c02db99ff9 100644 (file)
@@ -62,10 +62,15 @@ class ProtocolV2 final : public Protocol {
 
   void trigger_state(state_t state, write_state_t write_state, bool reentrant);
 
+  entity_name_t peer_name;
   uint64_t connection_features = 0;
   uint64_t peer_required_features = 0;
 
+  uint64_t client_cookie = 0;
+  uint64_t server_cookie = 0;
   uint64_t global_seq = 0;
+  uint64_t peer_global_seq = 0;
+  uint64_t connect_seq = 0;
 
  // TODO: Frame related implementations, probably to a separate class.
  private:
@@ -94,6 +99,7 @@ class ProtocolV2 final : public Protocol {
  private:
   seastar::future<> fault();
   void dispatch_reset();
+  void reset_session(bool full);
   seastar::future<entity_type_t, entity_addr_t> banner_exchange();
 
   // CONNECTING (client)