]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/test: implement v2 failover tests with crimson FailoverTestPeer 30162/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 5 Sep 2019 03:30:13 +0000 (11:30 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 5 Sep 2019 05:06:26 +0000 (13:06 +0800)
* Test framework which is able to bring up a local FailoverSuitePeer
  or connect to a remote FailoverSuitePeer;
* Implemented crimson FailoverSuitePeer;
* Test framework which can declare failure cases, messenger operations
  and introspect test results for different kinds of tests ;
* 47 test cases to cover combinations of policies and faults;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_messenger.cc

index d7d3f2ee8fc412ae714a748b6ddb526f6e130bd5..6342fdee274d5204960b7677043dabea038d58ee 100644 (file)
@@ -1,14 +1,22 @@
 #include "common/ceph_time.h"
 #include "messages/MPing.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
 #include "crimson/auth/DummyAuth.h"
 #include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
+#include "crimson/net/Config.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
+#include "crimson/net/Interceptor.h"
 
 #include <map>
 #include <random>
 #include <boost/program_options.hpp>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
 #include <seastar/core/app-template.hh>
 #include <seastar/core/do_with.hh>
 #include <seastar/core/future-util.hh>
@@ -572,6 +580,1613 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
   });
 }
 
+using ceph::msgr::v2::Tag;
+using ceph::net::Breakpoint;
+using ceph::net::Connection;
+using ceph::net::ConnectionRef;
+using ceph::net::custom_bp_t;
+using ceph::net::Dispatcher;
+using ceph::net::Interceptor;
+using ceph::net::Messenger;
+using ceph::net::SocketPolicy;
+using ceph::net::tag_bp_t;
+
+struct counter_t { unsigned counter = 0; };
+
+enum class conn_state_t {
+  unknown = 0,
+  established,
+  closed,
+  replaced,
+};
+
+std::ostream& operator<<(std::ostream& out, const conn_state_t& state) {
+  switch(state) {
+   case conn_state_t::unknown:
+    return out << "unknown";
+   case conn_state_t::established:
+    return out << "established";
+   case conn_state_t::closed:
+    return out << "closed";
+   case conn_state_t::replaced:
+    return out << "replaced";
+   default:
+    ceph_abort();
+  }
+}
+
+template <typename T>
+void _assert_eq(ConnectionRef conn,
+                unsigned index,
+                const char* expr_actual, T actual,
+                const char* expr_expected, T expected) {
+  if (actual != expected) {
+    throw std::runtime_error(fmt::format(
+          "[{}] {} '{}' is actually {}, not the expected '{}' {}",
+          index, *conn, expr_actual, actual, expr_expected, expected));
+  }
+}
+#define ASSERT_EQUAL(conn, index, actual, expected) \
+  _assert_eq(conn, index, #actual, actual, #expected, expected)
+
+struct ConnResult {
+  ConnectionRef conn;
+  unsigned index;
+  conn_state_t state = conn_state_t::unknown;
+
+  unsigned connect_attempts = 0;
+  unsigned client_connect_attempts = 0;
+  unsigned client_reconnect_attempts = 0;
+  unsigned cnt_connect_dispatched = 0;
+
+  unsigned accept_attempts = 0;
+  unsigned server_connect_attempts = 0;
+  unsigned server_reconnect_attempts = 0;
+  unsigned cnt_accept_dispatched = 0;
+
+  unsigned cnt_reset_dispatched = 0;
+  unsigned cnt_remote_reset_dispatched = 0;
+
+  ConnResult(Connection& conn, unsigned index)
+    : conn(conn.shared_from_this()), index(index) {}
+
+  void assert_state_at(conn_state_t expected) const {
+    ASSERT_EQUAL(conn, index, state, expected);
+  }
+
+  void assert_connect(unsigned attempts,
+                      unsigned connects,
+                      unsigned reconnects,
+                      unsigned dispatched) const {
+    ASSERT_EQUAL(conn, index, connect_attempts, attempts);
+    ASSERT_EQUAL(conn, index, client_connect_attempts, connects);
+    ASSERT_EQUAL(conn, index, client_reconnect_attempts, reconnects);
+    ASSERT_EQUAL(conn, index, cnt_connect_dispatched, dispatched);
+  }
+
+  void assert_accept(unsigned attempts,
+                     unsigned accepts,
+                     unsigned reaccepts,
+                     unsigned dispatched) const {
+    ASSERT_EQUAL(conn, index, accept_attempts, attempts);
+    ASSERT_EQUAL(conn, index, server_connect_attempts, accepts);
+    ASSERT_EQUAL(conn, index, server_reconnect_attempts, reaccepts);
+    ASSERT_EQUAL(conn, index, cnt_accept_dispatched, dispatched);
+  }
+
+  void assert_accept(unsigned attempts,
+                     unsigned total_accepts,
+                     unsigned dispatched) const {
+    ASSERT_EQUAL(conn, index, accept_attempts, attempts);
+    ASSERT_EQUAL(conn, index, server_connect_attempts + server_reconnect_attempts, total_accepts);
+    ASSERT_EQUAL(conn, index, cnt_accept_dispatched, dispatched);
+  }
+
+  void assert_reset(unsigned local, unsigned remote) const {
+    ASSERT_EQUAL(conn, index, cnt_reset_dispatched, local);
+    ASSERT_EQUAL(conn, index, cnt_remote_reset_dispatched, remote);
+  }
+
+  void dump() const {
+    logger().info("\nResult({}):\n"
+                  "  conn: [{}] {}:\n"
+                  "  state: {}\n"
+                  "  connect_attempts: {}\n"
+                  "  client_connect_attempts: {}\n"
+                  "  client_reconnect_attempts: {}\n"
+                  "  cnt_connect_dispatched: {}\n"
+                  "  accept_attempts: {}\n"
+                  "  server_connect_attempts: {}\n"
+                  "  server_reconnect_attempts: {}\n"
+                  "  cnt_accept_dispatched: {}\n"
+                  "  cnt_reset_dispatched: {}\n"
+                  "  cnt_remote_reset_dispatched: {}\n",
+                  this,
+                  index, *conn,
+                  state,
+                  connect_attempts,
+                  client_connect_attempts,
+                  client_reconnect_attempts,
+                  cnt_connect_dispatched,
+                  accept_attempts,
+                  server_connect_attempts,
+                  server_reconnect_attempts,
+                  cnt_accept_dispatched,
+                  cnt_reset_dispatched,
+                  cnt_remote_reset_dispatched);
+  }
+};
+using ConnResults = std::vector<ConnResult>;
+
+struct TestInterceptor : public Interceptor {
+  std::map<Breakpoint, std::set<unsigned>> breakpoints;
+  std::map<Breakpoint, counter_t> breakpoints_counter;
+  std::map<ConnectionRef, unsigned> conns;
+  ConnResults results;
+  std::optional<seastar::promise<>> signal;
+
+  TestInterceptor() = default;
+  // only used for copy breakpoint configurations
+  TestInterceptor(const TestInterceptor& other) {
+    assert(other.breakpoints_counter.empty());
+    assert(other.conns.empty());
+    assert(other.results.empty());
+    breakpoints = other.breakpoints;
+    assert(!other.signal);
+  }
+
+  void make_fault(Breakpoint bp, unsigned round = 1) {
+    assert(round >= 1);
+    breakpoints[bp].insert(round);
+  }
+
+  ConnResult* find_result(ConnectionRef conn) {
+    auto it = conns.find(conn);
+    if (it == conns.end()) {
+      return nullptr;
+    } else {
+      return &results[it->second];
+    }
+  }
+
+  seastar::future<> wait() {
+    assert(!signal);
+    signal = seastar::promise<>();
+    return signal->get_future();
+  }
+
+  void notify() {
+    if (signal) {
+      signal->set_value();
+      signal = std::nullopt;
+    }
+  }
+
+ private:
+  void register_conn(Connection& conn) override {
+    unsigned index = results.size();
+    results.emplace_back(conn, index);
+    conns[conn.shared_from_this()] = index;
+    notify();
+    logger().info("[{}] {} new connection registered", index, conn);
+  }
+
+  void register_conn_closed(Connection& conn) override {
+    auto result = find_result(conn.shared_from_this());
+    if (result == nullptr) {
+      logger().error("Untracked closed connection: {}", conn);
+      ceph_abort();
+    }
+
+    if (result->state != conn_state_t::replaced) {
+      result->state = conn_state_t::closed;
+    }
+    notify();
+    logger().info("[{}] {} closed({})", result->index, conn, result->state);
+  }
+
+  void register_conn_ready(Connection& conn) override {
+    auto result = find_result(conn.shared_from_this());
+    if (result == nullptr) {
+      logger().error("Untracked ready connection: {}", conn);
+      ceph_abort();
+    }
+
+    ceph_assert(conn.is_connected());
+    notify();
+    logger().info("[{}] {} ready", result->index, conn);
+  }
+
+  void register_conn_replaced(Connection& conn) override {
+    auto result = find_result(conn.shared_from_this());
+    if (result == nullptr) {
+      logger().error("Untracked replaced connection: {}", conn);
+      ceph_abort();
+    }
+
+    result->state = conn_state_t::replaced;
+    logger().info("[{}] {} {}", result->index, conn, result->state);
+  }
+
+  bool intercept(Connection& conn, Breakpoint bp) override {
+    ++breakpoints_counter[bp].counter;
+
+    auto result = find_result(conn.shared_from_this());
+    if (result == nullptr) {
+      logger().error("Untracked intercepted connection: {}, at breakpoint {}",
+                     conn, bp);
+      ceph_abort();
+    }
+    logger().info("[{}] {} intercepted {}", result->index, conn, bp);
+
+    if (bp == custom_bp_t::SOCKET_CONNECTING) {
+      ++result->connect_attempts;
+    } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, true}) {
+      ++result->client_connect_attempts;
+    } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, true}) {
+      ++result->client_reconnect_attempts;
+    } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
+      ++result->accept_attempts;
+    } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, false}) {
+      ++result->server_connect_attempts;
+    } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, false}) {
+      ++result->server_reconnect_attempts;
+    }
+
+    auto it_bp = breakpoints.find(bp);
+    if (it_bp != breakpoints.end()) {
+      auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
+      if (it_cnt != it_bp->second.end()) {
+        return true;
+      }
+    }
+    return false;
+  }
+};
+
+enum class cmd_t : char {
+  none = '\0',
+  shutdown,
+  suite_start,
+  suite_stop,
+  suite_connect_me,
+  suite_send_me,
+  suite_recv_op
+};
+
+enum class policy_t : char {
+  none = '\0',
+  stateful_server,
+  stateless_server,
+  lossless_peer,
+  lossless_peer_reuse,
+  lossy_client,
+  lossless_client
+};
+
+SocketPolicy to_socket_policy(policy_t policy) {
+  switch (policy) {
+   case policy_t::stateful_server:
+    return SocketPolicy::stateful_server(0);
+   case policy_t::stateless_server:
+    return SocketPolicy::stateless_server(0);
+   case policy_t::lossless_peer:
+    return SocketPolicy::lossless_peer(0);
+   case policy_t::lossless_peer_reuse:
+    return SocketPolicy::lossless_peer_reuse(0);
+   case policy_t::lossy_client:
+    return SocketPolicy::lossy_client(0);
+   case policy_t::lossless_client:
+    return SocketPolicy::lossless_client(0);
+   default:
+    logger().error("unexpected policy type");
+    ceph_abort();
+  }
+}
+
+class FailoverSuite : public Dispatcher {
+  ceph::auth::DummyAuthClientServer dummy_auth;
+  Messenger& test_msgr;
+  const entity_addr_t test_peer_addr;
+  TestInterceptor interceptor;
+
+  unsigned tracked_index = 0;
+  ConnectionRef tracked_conn;
+  unsigned pending_send = 0;
+  unsigned pending_peer_receive = 0;
+  unsigned pending_receive = 0;
+
+  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+    auto result = interceptor.find_result(c->shared_from_this());
+    if (result == nullptr) {
+      logger().error("Untracked ms dispatched connection: {}", *c);
+      ceph_abort();
+    }
+
+    if (tracked_conn != c->shared_from_this()) {
+      logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+                     result->index, *c, tracked_index, *tracked_conn);
+      ceph_abort();
+    }
+    ceph_assert(result->index == tracked_index);
+
+    ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+    ceph_assert(pending_receive > 0);
+    --pending_receive;
+    if (pending_receive == 0) {
+      interceptor.notify();
+    }
+    logger().info("[{}] {} got op, pending {} ops", result->index, *c, pending_receive);
+    return seastar::now();
+  }
+
+  seastar::future<> ms_handle_accept(ConnectionRef conn) override {
+    auto result = interceptor.find_result(conn);
+    if (result == nullptr) {
+      logger().error("Untracked accepted connection: {}", *conn);
+      ceph_abort();
+    }
+
+    if (tracked_conn) {
+      logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
+                     result->index, *conn, tracked_index, *tracked_conn);
+      ceph_abort();
+    }
+
+    tracked_index = result->index;
+    tracked_conn = conn;
+    ++result->cnt_accept_dispatched;
+    logger().info("[{}] {} got accepted and tracked, start to send {} ops",
+                  result->index, *conn, pending_send);
+    return flush_pending_send();
+  }
+
+  seastar::future<> ms_handle_connect(ConnectionRef conn) override {
+    auto result = interceptor.find_result(conn);
+    if (result == nullptr) {
+      logger().error("Untracked connected connection: {}", *conn);
+      ceph_abort();
+    }
+
+    if (tracked_conn != conn) {
+      logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}",
+                     result->index, *conn, tracked_index, *tracked_conn);
+      ceph_abort();
+    }
+    ceph_assert(result->index == tracked_index);
+
+    ++result->cnt_connect_dispatched;
+    logger().info("[{}] {} got connected", result->index, *conn);
+    return seastar::now();
+  }
+
+  seastar::future<> ms_handle_reset(ConnectionRef conn) override {
+    auto result = interceptor.find_result(conn);
+    if (result == nullptr) {
+      logger().error("Untracked reset connection: {}", *conn);
+      ceph_abort();
+    }
+
+    if (tracked_conn != conn) {
+      logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+                     result->index, *conn, tracked_index, *tracked_conn);
+      ceph_abort();
+    }
+    ceph_assert(result->index == tracked_index);
+
+    tracked_index = 0;
+    tracked_conn = nullptr;
+    ++result->cnt_reset_dispatched;
+    logger().info("[{}] {} got reset and untracked", result->index, *conn);
+    return seastar::now();
+  }
+
+  seastar::future<> ms_handle_remote_reset(ConnectionRef conn) override {
+    auto result = interceptor.find_result(conn);
+    if (result == nullptr) {
+      logger().error("Untracked remotely reset connection: {}", *conn);
+      ceph_abort();
+    }
+
+    if (tracked_conn != conn) {
+      logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+                     result->index, *conn, tracked_index, *tracked_conn);
+      ceph_abort();
+    }
+    ceph_assert(result->index == tracked_index);
+
+    logger().info("[{}] {} got remotely reset", result->index, *conn);
+    ++result->cnt_remote_reset_dispatched;
+    return seastar::now();
+  }
+
+ private:
+  seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+    test_msgr.set_default_policy(policy);
+    test_msgr.set_auth_client(&dummy_auth);
+    test_msgr.set_auth_server(&dummy_auth);
+    test_msgr.interceptor = &interceptor;
+    return test_msgr.bind(entity_addrvec_t{addr}).then([this] {
+      return test_msgr.start(this);
+    });
+  }
+
+  seastar::future<> send_op() {
+    ceph_assert(tracked_conn);
+    ++pending_peer_receive;
+    pg_t pgid;
+    object_locator_t oloc;
+    hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+                   pgid.pool(), oloc.nspace);
+    spg_t spgid(pgid);
+    return tracked_conn->send(make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0));
+  }
+
+  seastar::future<> flush_pending_send() {
+    ceph_assert(tracked_conn);
+    return seastar::do_until(
+        [this] { return pending_send == 0; },
+        [this] {
+      --pending_send;
+      return send_op();
+    });
+  }
+
+  seastar::future<> wait_ready(unsigned num_conns) {
+    assert(num_conns > 0);
+    if (interceptor.results.size() > num_conns) {
+      throw std::runtime_error(fmt::format(
+            "{} connections, more than expected: {}",
+            interceptor.results.size(), num_conns));
+    }
+
+    for (auto& result : interceptor.results) {
+      if (result.conn->is_closed()) {
+        continue;
+      }
+
+      if (result.conn->is_connected()) {
+        if (tracked_conn != result.conn || tracked_index != result.index) {
+          throw std::runtime_error(fmt::format(
+                "The connected connection [{}] {} doesn't"
+                " match the tracked connection [{}] {}",
+                result.index, *result.conn, tracked_index, tracked_conn));
+        }
+
+        if (pending_send || pending_peer_receive || pending_receive) {
+          logger().info("Waiting for pending_send={} pending_peer_receive={}"
+                        " pending_receive={} from [{}] {}",
+                        pending_send, pending_peer_receive, pending_receive,
+                        result.index, *result.conn);
+          return interceptor.wait().then([this, num_conns] {
+            return wait_ready(num_conns);
+          });
+        } else {
+          result.state = conn_state_t::established;
+        }
+      } else {
+        logger().info("Waiting for connection [{}] {} connected/closed",
+                      result.index, *result.conn);
+        return interceptor.wait().then([this, num_conns] {
+          return wait_ready(num_conns);
+        });
+      }
+    }
+
+    if (interceptor.results.size() < num_conns) {
+      logger().info("Waiting for incoming connection, currently {}, expected {}",
+                    interceptor.results.size(), num_conns);
+      return interceptor.wait().then([this, num_conns] {
+        return wait_ready(num_conns);
+      });
+    }
+
+    logger().debug("Wait done!");
+    return seastar::now();
+  }
+
+ // called by FailoverTest
+ public:
+  FailoverSuite(Messenger& test_msgr,
+                entity_addr_t test_peer_addr,
+                const TestInterceptor& interceptor)
+    : test_msgr(test_msgr),
+      test_peer_addr(test_peer_addr),
+      interceptor(interceptor) { }
+
+  seastar::future<> shutdown() {
+    return test_msgr.shutdown();
+  }
+
+  void needs_receive() {
+    ++pending_receive;
+  }
+
+  void notify_peer_reply() {
+    ceph_assert(pending_peer_receive > 0);
+    --pending_peer_receive;
+    logger().info("TestPeer received op, pending {} peer receive ops",
+                  pending_peer_receive);
+    if (pending_peer_receive == 0) {
+      interceptor.notify();
+    }
+  }
+
+  void post_check() const {
+    // make sure all breakpoints were hit
+    for (auto& kv : interceptor.breakpoints) {
+      auto it = interceptor.breakpoints_counter.find(kv.first);
+      if (it == interceptor.breakpoints_counter.end()) {
+        throw std::runtime_error(fmt::format("{} was missed", kv.first));
+      }
+      auto expected = *std::max_element(kv.second.begin(), kv.second.end());
+      if (expected > it->second.counter) {
+        throw std::runtime_error(fmt::format(
+              "{} only triggered {} times, not the expected {}",
+              kv.first, it->second.counter, expected));
+      }
+    }
+  }
+
+  static seastar::future<std::unique_ptr<FailoverSuite>>
+  create(entity_addr_t test_addr,
+         SocketPolicy test_policy,
+         entity_addr_t test_peer_addr,
+         const TestInterceptor& interceptor) {
+    return Messenger::create(entity_name_t::OSD(2), "Test", 2, 0
+    ).then([test_addr,
+            test_policy,
+            test_peer_addr,
+            interceptor] (Messenger* test_msgr) {
+      auto suite = std::make_unique<FailoverSuite>(
+          *test_msgr, test_peer_addr, interceptor);
+      return suite->init(test_addr, test_policy
+      ).then([suite = std::move(suite)] () mutable {
+        return std::move(suite);
+      });
+    });
+  }
+
+ // called by tests
+ public:
+  seastar::future<> connect_peer() {
+    ceph_assert(!tracked_conn);
+    return test_msgr.connect(test_peer_addr, entity_name_t::TYPE_OSD
+    ).then([this] (auto xconn) {
+      ceph_assert(!tracked_conn);
+
+      auto conn = xconn->release();
+      auto result = interceptor.find_result(conn);
+      ceph_assert(result != nullptr);
+
+      tracked_index = result->index;
+      tracked_conn = conn;
+      return flush_pending_send();
+    });
+  }
+
+  seastar::future<> send_peer() {
+    if (tracked_conn) {
+      ceph_assert(!pending_send);
+      return send_op();
+    } else {
+      ++pending_send;
+      return seastar::now();
+    }
+  }
+
+  seastar::future<std::reference_wrapper<ConnResults>>
+  wait_results(unsigned num_conns) {
+    return wait_ready(num_conns).then([this] {
+      return std::reference_wrapper<ConnResults>(interceptor.results);
+    });
+  }
+};
+
+class FailoverTest : public Dispatcher {
+  ceph::auth::DummyAuthClientServer dummy_auth;
+  Messenger& cmd_msgr;
+  ConnectionRef cmd_conn;
+  const entity_addr_t test_addr;
+  const entity_addr_t test_peer_addr;
+
+  std::optional<seastar::promise<>> recv_pong;
+  std::optional<seastar::promise<>> recv_cmdreply;
+
+  std::unique_ptr<FailoverSuite> test_suite;
+
+  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+    switch (m->get_type()) {
+     case CEPH_MSG_PING:
+      ceph_assert(recv_pong);
+      recv_pong->set_value();
+      recv_pong = std::nullopt;
+      return seastar::now();
+     case MSG_COMMAND_REPLY:
+      ceph_assert(recv_cmdreply);
+      recv_cmdreply->set_value();
+      recv_cmdreply = std::nullopt;
+      return seastar::now();
+     case MSG_COMMAND: {
+      auto m_cmd = boost::static_pointer_cast<MCommand>(m);
+      ceph_assert(static_cast<cmd_t>(m_cmd->cmd[0][0]) == cmd_t::suite_recv_op);
+      ceph_assert(test_suite);
+      test_suite->notify_peer_reply();
+      return seastar::now();
+     }
+     default:
+      logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
+      ceph_abort();
+    }
+  }
+
+ private:
+  seastar::future<> prepare_cmd(
+      cmd_t cmd,
+      std::function<void(ceph::ref_t<MCommand>)>
+        f_prepare = [] (auto m) { return; }) {
+    assert(!recv_cmdreply);
+    recv_cmdreply  = seastar::promise<>();
+    auto fut = recv_cmdreply->get_future();
+    auto m = make_message<MCommand>();
+    m->cmd.emplace_back(1, static_cast<char>(cmd));
+    f_prepare(m);
+    return cmd_conn->send(m).then([fut = std::move(fut)] () mutable {
+      return std::move(fut);
+    });
+  }
+
+  seastar::future<> start_peer(policy_t peer_policy) {
+    return prepare_cmd(cmd_t::suite_start,
+        [peer_policy] (auto m) {
+      m->cmd.emplace_back(1, static_cast<char>(peer_policy));
+    });
+  }
+
+  seastar::future<> stop_peer() {
+    return prepare_cmd(cmd_t::suite_stop);
+  }
+
+  seastar::future<> pingpong() {
+    assert(!recv_pong);
+    recv_pong = seastar::promise<>();
+    auto fut = recv_pong->get_future();
+    return cmd_conn->send(make_message<MPing>()
+    ).then([this, fut = std::move(fut)] () mutable {
+      return std::move(fut);
+    });
+  }
+
+  seastar::future<> init(entity_addr_t cmd_peer_addr) {
+    cmd_msgr.set_default_policy(SocketPolicy::lossy_client(0));
+    cmd_msgr.set_auth_client(&dummy_auth);
+    cmd_msgr.set_auth_server(&dummy_auth);
+    return cmd_msgr.start(this).then([this, cmd_peer_addr] {
+      return cmd_msgr.connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
+    }).then([this] (auto conn) {
+      cmd_conn = conn->release();
+      return pingpong();
+    });
+  }
+
+ public:
+  FailoverTest(Messenger& cmd_msgr,
+               entity_addr_t test_addr,
+               entity_addr_t test_peer_addr)
+    : cmd_msgr(cmd_msgr),
+      test_addr(test_addr),
+      test_peer_addr(test_peer_addr) { }
+
+  seastar::future<> shutdown() {
+    logger().info("CmdCli shutdown...");
+    assert(!recv_cmdreply);
+    auto m = make_message<MCommand>();
+    m->cmd.emplace_back(1, static_cast<char>(cmd_t::shutdown));
+    return cmd_conn->send(m).then([this] {
+      return seastar::sleep(200ms);
+    }).finally([this] {
+      return cmd_msgr.shutdown();
+    });
+  }
+
+  static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
+  create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
+    assert(cmd_peer_addr.is_msgr2());
+    return Messenger::create(entity_name_t::OSD(1), "CmdCli", 1, 0
+    ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) {
+      entity_addr_t test_peer_addr = cmd_peer_addr;
+      test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+      test_peer_addr.set_nonce(4);
+      auto test = seastar::make_lw_shared<FailoverTest>(
+          *cmd_msgr, test_addr, test_peer_addr);
+      return test->init(cmd_peer_addr).then([test] {
+        logger().info("CmdCli ready");
+        return test;
+      });
+    });
+  }
+
+ // called by tests
+ public:
+  seastar::future<> run_suite(
+      std::string name,
+      const TestInterceptor& interceptor,
+      policy_t test_policy,
+      policy_t peer_policy,
+      std::function<seastar::future<>(FailoverSuite&)>&& f) {
+    logger().info("\n\n[{}]", name);
+    ceph_assert(!test_suite);
+    SocketPolicy test_policy_ = to_socket_policy(test_policy);
+    return FailoverSuite::create(
+        test_addr, test_policy_, test_peer_addr, interceptor
+    ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
+      test_suite.swap(suite);
+      return start_peer(peer_policy).then([this, f = std::move(f)] {
+        return f(*test_suite);
+      }).then([this] {
+        test_suite->post_check();
+        logger().info("\n[SUCCESS]");
+      }).handle_exception([] (auto eptr) {
+        logger().info("\n[FAIL: {}]", eptr);
+        throw;
+      }).finally([this] {
+        return stop_peer();
+      }).finally([this] {
+        return test_suite->shutdown().then([this] {
+          test_suite.reset();
+        });
+      });
+    });
+  }
+
+  seastar::future<> peer_connect_me() {
+    return prepare_cmd(cmd_t::suite_connect_me,
+        [this] (auto m) {
+      m->cmd.emplace_back(fmt::format("{}", test_addr));
+    });
+  }
+
+  seastar::future<> peer_send_me() {
+    ceph_assert(test_suite);
+    test_suite->needs_receive();
+    return prepare_cmd(cmd_t::suite_send_me);
+  }
+
+  seastar::future<> send_bidirectional() {
+    ceph_assert(test_suite);
+    return test_suite->send_peer().then([this] {
+      return peer_send_me();
+    });
+  }
+};
+
+class FailoverSuitePeer : public Dispatcher {
+  using cb_t = std::function<seastar::future<>()>;
+  ceph::auth::DummyAuthClientServer dummy_auth;
+  Messenger& peer_msgr;
+  cb_t op_callback;
+
+  ConnectionRef tracked_conn;
+  unsigned pending_send = 0;
+
+  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+    ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+    ceph_assert(tracked_conn == c->shared_from_this());
+    return op_callback();
+  }
+
+  seastar::future<> ms_handle_accept(ConnectionRef conn) override {
+    ceph_assert(!tracked_conn);
+    tracked_conn = conn;
+    return flush_pending_send();
+  }
+
+  seastar::future<> ms_handle_reset(ConnectionRef conn) override {
+    ceph_assert(tracked_conn == conn);
+    tracked_conn = nullptr;
+    return seastar::now();
+  }
+
+ private:
+  seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+    peer_msgr.set_default_policy(policy);
+    peer_msgr.set_auth_client(&dummy_auth);
+    peer_msgr.set_auth_server(&dummy_auth);
+    return peer_msgr.bind(entity_addrvec_t{addr}).then([this] {
+      return peer_msgr.start(this);
+    });
+  }
+
+  seastar::future<> send_op() {
+    ceph_assert(tracked_conn);
+    pg_t pgid;
+    object_locator_t oloc;
+    hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+                   pgid.pool(), oloc.nspace);
+    spg_t spgid(pgid);
+    return tracked_conn->send(make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0));
+  }
+
+  seastar::future<> flush_pending_send() {
+    ceph_assert(tracked_conn);
+    return seastar::do_until(
+        [this] { return pending_send == 0; },
+        [this] {
+      --pending_send;
+      return send_op();
+    });
+  }
+
+ public:
+  FailoverSuitePeer(Messenger& peer_msgr, cb_t op_callback)
+    : peer_msgr(peer_msgr), op_callback(op_callback) { }
+
+  seastar::future<> shutdown() {
+    return peer_msgr.shutdown();
+  }
+
+  seastar::future<> connect(entity_addr_t addr) {
+    ceph_assert(!tracked_conn);
+    return peer_msgr.connect(addr, entity_name_t::TYPE_OSD
+    ).then([this] (auto xconn) {
+      ceph_assert(!tracked_conn);
+      tracked_conn = xconn->release();
+      return flush_pending_send();
+    });
+  }
+
+  seastar::future<> send_peer() {
+    if (tracked_conn) {
+      return send_op();
+    } else {
+      ++pending_send;
+      return seastar::now();
+    }
+  }
+
+  static seastar::future<std::unique_ptr<FailoverSuitePeer>>
+  create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) {
+    return Messenger::create(entity_name_t::OSD(4), "TestPeer", 4, 0
+    ).then([addr, policy, op_callback] (Messenger* peer_msgr) {
+      auto suite = std::make_unique<FailoverSuitePeer>(*peer_msgr, op_callback);
+      return suite->init(addr, policy
+      ).then([suite = std::move(suite)] () mutable {
+        return std::move(suite);
+      });
+    });
+  }
+};
+
+class FailoverTestPeer : public Dispatcher {
+  ceph::auth::DummyAuthClientServer dummy_auth;
+  Messenger& cmd_msgr;
+  ConnectionRef cmd_conn;
+  std::unique_ptr<FailoverSuitePeer> test_suite;
+
+  seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+    ceph_assert(cmd_conn == c->shared_from_this());
+    switch (m->get_type()) {
+     case CEPH_MSG_PING:
+      return c->send(make_message<MPing>());
+     case MSG_COMMAND: {
+      auto m_cmd = boost::static_pointer_cast<MCommand>(m);
+      auto cmd = static_cast<cmd_t>(m_cmd->cmd[0][0]);
+      if (cmd == cmd_t::shutdown) {
+        logger().info("CmdSrv shutdown...");
+        cmd_msgr.shutdown();
+        return seastar::now();
+      }
+      return handle_cmd(cmd, m_cmd).then([c] {
+        return c->send(make_message<MCommandReply>());
+      });
+     }
+     default:
+      logger().error("{} got unexpected msg from cmd client: {}", *c, m);
+      ceph_abort();
+    }
+  }
+
+  seastar::future<> ms_handle_accept(ConnectionRef conn) override {
+    cmd_conn = conn;
+    return seastar::now();
+  }
+
+ private:
+  seastar::future<> notify_recv_op() {
+    ceph_assert(cmd_conn);
+    auto m = make_message<MCommand>();
+    m->cmd.emplace_back(1, static_cast<char>(cmd_t::suite_recv_op));
+    return cmd_conn->send(m);
+  }
+
+  seastar::future<> handle_cmd(cmd_t cmd, MRef<MCommand> m_cmd) {
+    switch (cmd) {
+     case cmd_t::suite_start: {
+      ceph_assert(!test_suite);
+      // suite bind to cmd_addr, with port + 1
+      auto test_peer_addr = get_addr();
+      test_peer_addr.set_port(get_addr().get_port() + 1);
+      auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
+      return FailoverSuitePeer::create(test_peer_addr, policy,
+                                       [this] { return notify_recv_op(); }
+      ).then([this] (auto suite) {
+        test_suite.swap(suite);
+      });
+     }
+     case cmd_t::suite_stop:
+      ceph_assert(test_suite);
+      return test_suite->shutdown().then([this] {
+        test_suite.reset();
+      });
+     case cmd_t::suite_connect_me: {
+      ceph_assert(test_suite);
+      entity_addr_t test_addr = entity_addr_t();
+      test_addr.parse(m_cmd->cmd[1].c_str(), nullptr);
+      return test_suite->connect(test_addr);
+     }
+     case cmd_t::suite_send_me:
+      ceph_assert(test_suite);
+      return test_suite->send_peer();
+     default:
+      logger().error("TestPeer got unexpected command {} from Test", m_cmd);
+      ceph_abort();
+      return seastar::now();
+    }
+  }
+
+  seastar::future<> init(entity_addr_t cmd_addr) {
+    cmd_msgr.set_default_policy(SocketPolicy::stateless_server(0));
+    cmd_msgr.set_auth_client(&dummy_auth);
+    cmd_msgr.set_auth_server(&dummy_auth);
+    return cmd_msgr.bind(entity_addrvec_t{cmd_addr}).then([this] {
+      return cmd_msgr.start(this);
+    });
+  }
+
+ public:
+  FailoverTestPeer(Messenger& cmd_msgr)
+    : cmd_msgr(cmd_msgr) { }
+
+  entity_addr_t get_addr() const {
+    return cmd_msgr.get_myaddr();
+  }
+
+  seastar::future<> wait() {
+    return cmd_msgr.wait();
+  }
+
+  static seastar::future<std::unique_ptr<FailoverTestPeer>> create() {
+    return Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3, 0
+    ).then([] (Messenger* cmd_msgr) {
+      entity_addr_t cmd_addr;
+      cmd_addr.parse("v2:127.0.0.1:9011", nullptr);
+      auto test_peer = std::make_unique<FailoverTestPeer>(*cmd_msgr);
+      return test_peer->init(cmd_addr
+      ).then([test_peer = std::move(test_peer)] () mutable {
+        logger().info("CmdSrv ready");
+        return std::move(test_peer);
+      });
+    });
+  }
+};
+
+seastar::future<>
+test_v2_lossy_early_connect_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {custom_bp_t::BANNER_WRITE},
+      {custom_bp_t::BANNER_READ},
+      {custom_bp_t::BANNER_PAYLOAD_READ},
+      {custom_bp_t::SOCKET_CONNECTING},
+      {Tag::HELLO, true},
+      {Tag::HELLO, false},
+      {Tag::AUTH_REQUEST, true},
+      {Tag::AUTH_DONE, false},
+      {Tag::AUTH_SIGNATURE, true},
+      {Tag::AUTH_SIGNATURE, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossy_early_connect_fault -- {}", bp),
+          interceptor,
+          policy_t::lossy_client,
+          policy_t::stateless_server,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(2, 1, 0, 1);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossy_connect_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::CLIENT_IDENT, true},
+      {Tag::SERVER_IDENT, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossy_connect_fault -- {}", bp),
+          interceptor,
+          policy_t::lossy_client,
+          policy_t::stateless_server,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&suite] {
+          return suite.send_peer();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(2, 2, 0, 1);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossy_connected_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::MESSAGE, true},
+      {Tag::MESSAGE, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossy_connected_fault -- {}", bp),
+          interceptor,
+          policy_t::lossy_client,
+          policy_t::stateless_server,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::closed);
+          results[0].assert_connect(1, 1, 0, 1);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(1, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossy_early_accept_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {custom_bp_t::BANNER_WRITE},
+      {custom_bp_t::BANNER_READ},
+      {custom_bp_t::BANNER_PAYLOAD_READ},
+      {Tag::HELLO, true},
+      {Tag::HELLO, false},
+      {Tag::AUTH_REQUEST, false},
+      {Tag::AUTH_DONE, true},
+      {Tag::AUTH_SIGNATURE, true},
+      {Tag::AUTH_SIGNATURE, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossy_early_accept_fault -- {}", bp),
+          interceptor,
+          policy_t::stateless_server,
+          policy_t::lossy_client,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_results(2);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::closed);
+          results[0].assert_connect(0, 0, 0, 0);
+          results[0].assert_accept(1, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+          results[1].assert_state_at(conn_state_t::established);
+          results[1].assert_connect(0, 0, 0, 0);
+          results[1].assert_accept(1, 1, 0, 1);
+          results[1].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossy_accept_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::CLIENT_IDENT, false},
+      {Tag::SERVER_IDENT, true},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossy_accept_fault -- {}", bp),
+          interceptor,
+          policy_t::stateless_server,
+          policy_t::lossy_client,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_results(2);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::closed);
+          results[0].assert_connect(0, 0, 0, 0);
+          results[0].assert_accept(1, 1, 0, 0);
+          results[0].assert_reset(0, 0);
+          results[1].assert_state_at(conn_state_t::established);
+          results[1].assert_connect(0, 0, 0, 0);
+          results[1].assert_accept(1, 1, 0, 1);
+          results[1].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossy_accepted_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::MESSAGE, true},
+      {Tag::MESSAGE, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossy_accepted_fault -- {}", bp),
+          interceptor,
+          policy_t::stateless_server,
+          policy_t::lossy_client,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::closed);
+          results[0].assert_connect(0, 0, 0, 0);
+          results[0].assert_accept(1, 1, 0, 1);
+          results[0].assert_reset(1, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossless_connect_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::CLIENT_IDENT, true},
+      {Tag::SERVER_IDENT, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossless_connect_fault -- {}", bp),
+          interceptor,
+          policy_t::lossless_client,
+          policy_t::stateful_server,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(2, 2, 0, 1);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossless_connected_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::MESSAGE, true},
+      {Tag::MESSAGE, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossless_connected_fault -- {}", bp),
+          interceptor,
+          policy_t::lossless_client,
+          policy_t::stateful_server,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(2, 1, 1, 2);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossless_reconnect_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<std::pair<Breakpoint, Breakpoint>>{
+      {{Tag::MESSAGE, true}, {Tag::SESSION_RECONNECT, true}},
+      {{Tag::MESSAGE, true}, {Tag::SESSION_RECONNECT_OK, false}},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp_pair.first);
+      interceptor.make_fault(bp_pair.second);
+      return test.run_suite(
+          fmt::format("test_v2_lossless_reconnect_fault -- {}, {}",
+                      bp_pair.first, bp_pair.second),
+          interceptor,
+          policy_t::lossless_client,
+          policy_t::stateful_server,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(3, 1, 2, 2);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossless_accept_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::CLIENT_IDENT, false},
+      {Tag::SERVER_IDENT, true},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossless_accept_fault -- {}", bp),
+          interceptor,
+          policy_t::stateful_server,
+          policy_t::lossless_client,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_results(2);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::closed);
+          results[0].assert_connect(0, 0, 0, 0);
+          results[0].assert_accept(1, 1, 0, 0);
+          results[0].assert_reset(0, 0);
+          results[1].assert_state_at(conn_state_t::established);
+          results[1].assert_connect(0, 0, 0, 0);
+          results[1].assert_accept(1, 1, 0, 1);
+          results[1].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossless_accepted_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::MESSAGE, true},
+      {Tag::MESSAGE, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_lossless_accepted_fault -- {}", bp),
+          interceptor,
+          policy_t::stateful_server,
+          policy_t::lossless_client,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_results(2);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(0, 0, 0, 0);
+          results[0].assert_accept(1, 1, 0, 1);
+          results[0].assert_reset(0, 0);
+          results[1].assert_state_at(conn_state_t::replaced);
+          results[1].assert_connect(0, 0, 0, 0);
+          results[1].assert_accept(1, 1, 0);
+          results[1].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_lossless_reaccept_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<std::pair<Breakpoint, Breakpoint>>{
+      {{Tag::MESSAGE, false}, {Tag::SESSION_RECONNECT, false}},
+      {{Tag::MESSAGE, false}, {Tag::SESSION_RECONNECT_OK, true}},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp_pair.first);
+      interceptor.make_fault(bp_pair.second);
+      return test.run_suite(
+          fmt::format("test_v2_lossless_reaccept_fault -- {}, {}",
+                      bp_pair.first, bp_pair.second),
+          interceptor,
+          policy_t::stateful_server,
+          policy_t::lossless_client,
+          [&test, bp = bp_pair.second] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.send_bidirectional();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_results(3);
+        }).then([bp] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(0, 0, 0, 0);
+          results[0].assert_accept(1, 1, 0, 1);
+          results[0].assert_reset(0, 0);
+          if (bp == Breakpoint{Tag::SESSION_RECONNECT, false}) {
+            results[1].assert_state_at(conn_state_t::closed);
+          } else {
+            results[1].assert_state_at(conn_state_t::replaced);
+          }
+          results[1].assert_connect(0, 0, 0, 0);
+          results[1].assert_accept(1, 0, 1, 0);
+          results[1].assert_reset(0, 0);
+          results[2].assert_state_at(conn_state_t::replaced);
+          results[2].assert_connect(0, 0, 0, 0);
+          results[2].assert_accept(1, 0, 1, 0);
+          results[2].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_peer_connect_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::CLIENT_IDENT, true},
+      {Tag::SERVER_IDENT, false},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_peer_connect_fault -- {}", bp),
+          interceptor,
+          policy_t::lossless_peer,
+          policy_t::lossless_peer,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&suite] {
+          return suite.send_peer();
+        }).then([&suite] {
+          return suite.connect_peer();
+        }).then([&suite] {
+          return suite.wait_results(1);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::established);
+          results[0].assert_connect(2, 2, 0, 1);
+          results[0].assert_accept(0, 0, 0, 0);
+          results[0].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_peer_accept_fault(FailoverTest& test) {
+  return seastar::do_with(std::vector<Breakpoint>{
+      {Tag::CLIENT_IDENT, false},
+      {Tag::SERVER_IDENT, true},
+  }, [&test] (auto& failure_cases) {
+    return seastar::do_for_each(failure_cases, [&test] (auto bp) {
+      TestInterceptor interceptor;
+      interceptor.make_fault(bp);
+      return test.run_suite(
+          fmt::format("test_v2_peer_accept_fault -- {}", bp),
+          interceptor,
+          policy_t::lossless_peer,
+          policy_t::lossless_peer,
+          [&test] (FailoverSuite& suite) {
+        return seastar::futurize_apply([&test] {
+          return test.peer_send_me();
+        }).then([&test] {
+          return test.peer_connect_me();
+        }).then([&suite] {
+          return suite.wait_results(2);
+        }).then([] (ConnResults& results) {
+          results[0].assert_state_at(conn_state_t::closed);
+          results[0].assert_connect(0, 0, 0, 0);
+          results[0].assert_accept(1, 1, 0, 0);
+          results[0].assert_reset(0, 0);
+          results[1].assert_state_at(conn_state_t::established);
+          results[1].assert_connect(0, 0, 0, 0);
+          results[1].assert_accept(1, 1, 0, 1);
+          results[1].assert_reset(0, 0);
+        });
+      });
+    });
+  });
+}
+
+seastar::future<>
+test_v2_peer_connected_fault_reconnect(FailoverTest& test) {
+  auto bp = Breakpoint{Tag::MESSAGE, true};
+  TestInterceptor interceptor;
+  interceptor.make_fault(bp);
+  return test.run_suite(
+      fmt::format("test_v2_peer_connected_fault_reconnect -- {}", bp),
+      interceptor,
+      policy_t::lossless_peer,
+      policy_t::lossless_peer,
+      [&test] (FailoverSuite& suite) {
+    return seastar::futurize_apply([&suite] {
+      return suite.send_peer();
+    }).then([&suite] {
+      return suite.connect_peer();
+    }).then([&suite] {
+      return suite.wait_results(1);
+    }).then([] (ConnResults& results) {
+      results[0].assert_state_at(conn_state_t::established);
+      results[0].assert_connect(2, 1, 1, 2);
+      results[0].assert_accept(0, 0, 0, 0);
+      results[0].assert_reset(0, 0);
+    });
+  });
+}
+
+seastar::future<>
+test_v2_peer_connected_fault_reaccept(FailoverTest& test) {
+  auto bp = Breakpoint{Tag::MESSAGE, false};
+  TestInterceptor interceptor;
+  interceptor.make_fault(bp);
+  return test.run_suite(
+      fmt::format("test_v2_peer_connected_fault_reaccept -- {}", bp),
+      interceptor,
+      policy_t::lossless_peer,
+      policy_t::lossless_peer,
+      [&test] (FailoverSuite& suite) {
+    return seastar::futurize_apply([&test] {
+      return test.peer_send_me();
+    }).then([&suite] {
+      return suite.connect_peer();
+    }).then([&suite] {
+      return suite.wait_results(2);
+    }).then([] (ConnResults& results) {
+      results[0].assert_state_at(conn_state_t::established);
+      results[0].assert_connect(1, 1, 0, 1);
+      results[0].assert_accept(0, 0, 0, 0);
+      results[0].assert_reset(0, 0);
+      results[1].assert_state_at(conn_state_t::replaced);
+      results[1].assert_connect(0, 0, 0, 0);
+      results[1].assert_accept(1, 0, 1, 0);
+      results[1].assert_reset(0, 0);
+    });
+  });
+}
+
+seastar::future<>
+test_v2_failover(entity_addr_t test_addr = entity_addr_t(),
+                 entity_addr_t cmd_peer_addr = entity_addr_t()) {
+  if (test_addr == entity_addr_t() || cmd_peer_addr == entity_addr_t()) {
+    // initiate crimson test peer locally
+    logger().info("test_v2_failover: start local TestPeer...");
+    return FailoverTestPeer::create().then([] (auto peer) {
+      entity_addr_t test_addr_;
+      test_addr_.parse("v2:127.0.0.1:9010");
+      return test_v2_failover(test_addr_, peer->get_addr()
+      ).finally([peer = std::move(peer)] () mutable {
+        return peer->wait().then([peer = std::move(peer)] {});
+      });
+    }).handle_exception([] (auto eptr) {
+      logger().error("FailoverTestPeer: got exception {}", eptr);
+      throw;
+    });
+  }
+
+  test_addr.set_nonce(2);
+  return FailoverTest::create(cmd_peer_addr, test_addr).then([] (auto test) {
+    return seastar::futurize_apply([test] {
+      return test_v2_lossy_early_connect_fault(*test);
+    }).then([test] {
+      return test_v2_lossy_connect_fault(*test);
+    }).then([test] {
+      return test_v2_lossy_connected_fault(*test);
+    }).then([test] {
+      return test_v2_lossy_early_accept_fault(*test);
+    }).then([test] {
+      return test_v2_lossy_accept_fault(*test);
+    }).then([test] {
+      return test_v2_lossy_accepted_fault(*test);
+    }).then([test] {
+      return test_v2_lossless_connect_fault(*test);
+    }).then([test] {
+      return test_v2_lossless_connected_fault(*test);
+    }).then([test] {
+      return test_v2_lossless_reconnect_fault(*test);
+    }).then([test] {
+      return test_v2_lossless_accept_fault(*test);
+    }).then([test] {
+      return test_v2_lossless_accepted_fault(*test);
+    }).then([test] {
+      return test_v2_lossless_reaccept_fault(*test);
+    }).then([test] {
+      return test_v2_peer_connect_fault(*test);
+    }).then([test] {
+      return test_v2_peer_accept_fault(*test);
+    }).then([test] {
+      return test_v2_peer_connected_fault_reconnect(*test);
+    }).then([test] {
+      return test_v2_peer_connected_fault_reaccept(*test);
+    }).finally([test] {
+      return test->shutdown().then([test] {});
+    });
+  }).handle_exception([] (auto eptr) {
+    logger().error("FailoverTest: got exception {}", eptr);
+    throw;
+  });
+}
+
 }
 
 int main(int argc, char** argv)
@@ -600,6 +2215,8 @@ int main(int argc, char** argv)
       return test_preemptive_shutdown(false);
     }).then([] {
       return test_preemptive_shutdown(true);
+    }).then([] {
+      return test_v2_failover();
     }).then([] {
       std::cout << "All tests succeeded" << std::endl;
     }).handle_exception([] (auto eptr) {