]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson/test_messenger: support cross-core protocol-level tests
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 3 Aug 2023 07:42:45 +0000 (15:42 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 23 Aug 2023 04:49:09 +0000 (12:49 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_messenger.cc

index 1359b5aeb62ea923e6a69462cfc1dd7181cd2848..3c6616cbd6a16aa00478207111899d9553aa94c9 100644 (file)
@@ -671,15 +671,18 @@ struct TestInterceptor : public Interceptor {
   std::map<Connection*, unsigned> conns;
   ConnResults results;
   std::optional<seastar::abort_source> signal;
+  const seastar::shard_id primary_sid;
+
+  TestInterceptor() : primary_sid{seastar::this_shard_id()} {}
 
-  TestInterceptor() = default;
   // only used for copy breakpoint configurations
-  TestInterceptor(const TestInterceptor& other) {
+  TestInterceptor(const TestInterceptor& other) : primary_sid{other.primary_sid} {
     assert(other.breakpoints_counter.empty());
     assert(other.conns.empty());
     assert(other.results.empty());
     breakpoints = other.breakpoints;
     assert(!other.signal);
+    assert(seastar::this_shard_id() == primary_sid);
   }
 
   void make_fault(Breakpoint bp, unsigned round = 1) {
@@ -698,6 +701,7 @@ struct TestInterceptor : public Interceptor {
   }
 
   ConnResult* find_result(Connection *conn) {
+    assert(seastar::this_shard_id() == primary_sid);
     auto it = conns.find(conn);
     if (it == conns.end()) {
       return nullptr;
@@ -707,6 +711,7 @@ struct TestInterceptor : public Interceptor {
   }
 
   seastar::future<> wait() {
+    assert(seastar::this_shard_id() == primary_sid);
     assert(!signal);
     signal = seastar::abort_source();
     return seastar::sleep_abortable(10s, *signal).then([] {
@@ -717,6 +722,7 @@ struct TestInterceptor : public Interceptor {
   }
 
   void notify() {
+    assert(seastar::this_shard_id() == primary_sid);
     if (signal) {
       signal->request_abort();
       signal = std::nullopt;
@@ -778,65 +784,68 @@ struct TestInterceptor : public Interceptor {
   seastar::future<bp_action_t>
   intercept(Connection &_conn, std::vector<Breakpoint> bps) override {
     assert(bps.size() >= 1);
+    Connection *conn = &_conn;
 
-    std::vector<bp_action_t> actions;
-    for (const Breakpoint &bp : bps) {
-      ++breakpoints_counter[bp].counter;
+    return seastar::smp::submit_to(primary_sid, [conn, bps, this] {
+      std::vector<bp_action_t> actions;
+      for (const Breakpoint &bp : bps) {
+        ++breakpoints_counter[bp].counter;
 
-      auto result = find_result(&*conn);
-      if (result == nullptr) {
-        logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
-                       *conn, bp, breakpoints_counter[bp].counter);
-        ceph_abort();
-      }
+        auto result = find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
+                         *conn, bp, breakpoints_counter[bp].counter);
+          ceph_abort();
+        }
 
-      if (bp == custom_bp_t::SOCKET_CONNECTING) {
-        ++result->connect_attempts;
-        logger().info("[Test] connect_attempts={}", result->connect_attempts);
-      } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
-        ++result->client_connect_attempts;
-        logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
-      } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
-        ++result->client_reconnect_attempts;
-        logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
-      } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
-        ++result->accept_attempts;
-        logger().info("[Test] accept_attempts={}", result->accept_attempts);
-      } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
-        ++result->server_connect_attempts;
-        logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
-      } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
-        ++result->server_reconnect_attempts;
-        logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
-      }
+        if (bp == custom_bp_t::SOCKET_CONNECTING) {
+          ++result->connect_attempts;
+          logger().info("[Test] connect_attempts={}", result->connect_attempts);
+        } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
+          ++result->client_connect_attempts;
+          logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
+        } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
+          ++result->client_reconnect_attempts;
+          logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
+        } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
+          ++result->accept_attempts;
+          logger().info("[Test] accept_attempts={}", result->accept_attempts);
+        } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
+          ++result->server_connect_attempts;
+          logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
+        } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+          ++result->server_reconnect_attempts;
+          logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
+        }
 
-      auto it_bp = breakpoints.find(bp);
-      if (it_bp != breakpoints.end()) {
-        auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
-        if (it_cnt != it_bp->second.end()) {
-          logger().info("[{}] {} intercepted {}({}) => {}",
-                        result->index, *conn, bp,
-                        breakpoints_counter[bp].counter, it_cnt->second);
-          actions.emplace_back(it_cnt->second);
-          continue;
+        auto it_bp = breakpoints.find(bp);
+        if (it_bp != breakpoints.end()) {
+          auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
+          if (it_cnt != it_bp->second.end()) {
+            logger().info("[{}] {} intercepted {}({}) => {}",
+                          result->index, *conn, bp,
+                          breakpoints_counter[bp].counter, it_cnt->second);
+            actions.emplace_back(it_cnt->second);
+            continue;
+          }
         }
+        logger().info("[{}] {} intercepted {}({})",
+                      result->index, *conn, bp, breakpoints_counter[bp].counter);
+        actions.emplace_back(bp_action_t::CONTINUE);
       }
-      logger().info("[{}] {} intercepted {}({})",
-                    result->index, *conn, bp, breakpoints_counter[bp].counter);
-      actions.emplace_back(bp_action_t::CONTINUE);
-    }
 
-    bp_action_t action = bp_action_t::CONTINUE;
-    for (bp_action_t &a : actions) {
-      if (a != bp_action_t::CONTINUE) {
-        if (action == bp_action_t::CONTINUE) {
-          action = a;
-        } else {
-          ceph_abort("got multiple incompatible actions");
+      bp_action_t action = bp_action_t::CONTINUE;
+      for (bp_action_t &a : actions) {
+        if (a != bp_action_t::CONTINUE) {
+          if (action == bp_action_t::CONTINUE) {
+            action = a;
+          } else {
+            ceph_abort("got multiple incompatible actions");
+          }
         }
       }
-    }
-    return seastar::make_ready_future<bp_action_t>(action);
+      return seastar::make_ready_future<bp_action_t>(action);
+    });
   }
 };
 
@@ -872,123 +881,158 @@ class FailoverSuite : public Dispatcher {
   unsigned pending_peer_receive = 0;
   unsigned pending_receive = 0;
 
-  std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
-    auto result = interceptor.find_result(&*conn);
-    if (result == nullptr) {
-      logger().error("Untracked ms dispatched connection: {}", *conn);
-      ceph_abort();
-    }
-
-    if (tracked_conn != &*conn) {
-      logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
-                    result->index, *conn, tracked_index, *tracked_conn);
-    } else {
-      ceph_assert(result->index == tracked_index);
-    }
+  ShardedGates &gates;
+  const seastar::shard_id primary_sid;
 
+  std::optional<seastar::future<>> ms_dispatch(
+      ConnectionRef conn_ref, MessageRef m) override {
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-    ceph_assert(pending_receive > 0);
-    --pending_receive;
-    if (pending_receive == 0) {
-      interceptor.notify();
-    }
-    logger().info("[Test] got op, left {} ops -- [{}] {}",
-                  pending_receive, result->index, *conn);
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked ms dispatched connection: {}", *conn);
+          ceph_abort();
+        }
+
+        if (tracked_conn != &*conn) {
+          logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+                        result->index, *conn, tracked_index, *tracked_conn);
+        } else {
+          ceph_assert(result->index == tracked_index);
+        }
+
+        ceph_assert(pending_receive > 0);
+        --pending_receive;
+        if (pending_receive == 0) {
+          interceptor.notify();
+        }
+        logger().info("[Test] got op, left {} ops -- [{}] {}",
+                      pending_receive, result->index, *conn);
+      }).then([conn_ref] {});
+    });
     return {seastar::now()};
   }
 
   void ms_handle_accept(
-      ConnectionRef conn,
+      ConnectionRef conn_ref,
       seastar::shard_id prv_shard,
       bool is_replace) override {
-    assert(prv_shard == seastar::this_shard_id());
-    auto result = interceptor.find_result(&*conn);
-    if (result == nullptr) {
-      logger().error("Untracked accepted connection: {}", *conn);
-      ceph_abort();
-    }
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked accepted connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn &&
-        !tracked_conn->is_protocol_closed() &&
-        tracked_conn != &*conn) {
-      logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
-    }
+        if (tracked_conn &&
+            !tracked_conn->is_protocol_closed() &&
+            tracked_conn != &*conn) {
+          logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
+                         result->index, *conn, tracked_index, *tracked_conn);
+          ceph_abort();
+        }
 
-    tracked_index = result->index;
-    tracked_conn = &*conn;
-    ++result->cnt_accept_dispatched;
-    logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
-                  result->cnt_accept_dispatched, result->index, *conn);
-    std::ignore = flush_pending_send();
+        tracked_index = result->index;
+        tracked_conn = &*conn;
+        ++result->cnt_accept_dispatched;
+        logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
+                      result->cnt_accept_dispatched, result->index, *conn);
+        return flush_pending_send();
+      }).then([conn_ref] {});
+    });
   }
 
   void ms_handle_connect(
-      ConnectionRef conn,
+      ConnectionRef conn_ref,
       seastar::shard_id prv_shard) override {
-    assert(prv_shard == seastar::this_shard_id());
-    auto result = interceptor.find_result(&*conn);
-    if (result == nullptr) {
-      logger().error("Untracked connected connection: {}", *conn);
-      ceph_abort();
-    }
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked connected connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn &&
-        !tracked_conn->is_protocol_closed() &&
-        tracked_conn != &*conn) {
-      logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
-    }
+        if (tracked_conn &&
+            !tracked_conn->is_protocol_closed() &&
+            tracked_conn != &*conn) {
+          logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
+                         result->index, *conn, tracked_index, *tracked_conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn == &*conn) {
-      ceph_assert(result->index == tracked_index);
-    }
+        if (tracked_conn == &*conn) {
+          ceph_assert(result->index == tracked_index);
+        }
 
-    ++result->cnt_connect_dispatched;
-    logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
-                  result->cnt_connect_dispatched, result->index, *conn);
+        ++result->cnt_connect_dispatched;
+        logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
+                      result->cnt_connect_dispatched, result->index, *conn);
+      }).then([conn_ref] {});
+    });
   }
 
-  void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
-    auto result = interceptor.find_result(&*conn);
-    if (result == nullptr) {
-      logger().error("Untracked reset connection: {}", *conn);
-      ceph_abort();
-    }
+  void ms_handle_reset(
+      ConnectionRef conn_ref,
+      bool is_replace) override {
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked reset connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn != &*conn) {
-      logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
-                    result->index, *conn, tracked_index, *tracked_conn);
-    } else {
-      ceph_assert(result->index == tracked_index);
-      tracked_index = 0;
-      tracked_conn = nullptr;
-    }
+        if (tracked_conn != &*conn) {
+          logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+                        result->index, *conn, tracked_index, *tracked_conn);
+        } else {
+          ceph_assert(result->index == tracked_index);
+          tracked_index = 0;
+          tracked_conn = nullptr;
+        }
 
-    ++result->cnt_reset_dispatched;
-    logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
-                  result->cnt_reset_dispatched, result->index, *conn);
+        ++result->cnt_reset_dispatched;
+        logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
+                      result->cnt_reset_dispatched, result->index, *conn);
+      }).then([conn_ref] {});
+    });
   }
 
-  void ms_handle_remote_reset(ConnectionRef conn) override {
-    auto result = interceptor.find_result(&*conn);
-    if (result == nullptr) {
-      logger().error("Untracked remotely reset connection: {}", *conn);
-      ceph_abort();
-    }
+  void ms_handle_remote_reset(
+      ConnectionRef conn_ref) override {
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked remotely reset connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn != &*conn) {
-      logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
-                    result->index, *conn, tracked_index, *tracked_conn);
-    } else {
-      ceph_assert(result->index == tracked_index);
-    }
+        if (tracked_conn != &*conn) {
+          logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+                        result->index, *conn, tracked_index, *tracked_conn);
+        } else {
+          ceph_assert(result->index == tracked_index);
+        }
 
-    ++result->cnt_remote_reset_dispatched;
-    logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
-                  result->cnt_remote_reset_dispatched, result->index, *conn);
+        ++result->cnt_remote_reset_dispatched;
+        logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
+                      result->cnt_remote_reset_dispatched, result->index, *conn);
+      }).then([conn_ref] {});
+    });
   }
 
  private:
@@ -1037,6 +1081,7 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> wait_ready(unsigned num_ready_conns,
                                unsigned num_replaced,
                                bool wait_received) {
+    assert(seastar::this_shard_id() == primary_sid);
     unsigned pending_conns = 0;
     unsigned pending_establish = 0;
     unsigned replaced_conns = 0;
@@ -1116,10 +1161,13 @@ class FailoverSuite : public Dispatcher {
  public:
   FailoverSuite(MessengerRef test_msgr,
                 entity_addr_t test_peer_addr,
-                const TestInterceptor& interceptor)
+                const TestInterceptor& interceptor,
+                ShardedGates &gates)
     : test_msgr(test_msgr),
       test_peer_addr(test_peer_addr),
-      interceptor(interceptor) { }
+      interceptor(interceptor),
+      gates{gates},
+      primary_sid{seastar::this_shard_id()} { }
 
   entity_addr_t get_addr() const {
     return test_msgr->get_myaddr();
@@ -1170,14 +1218,17 @@ class FailoverSuite : public Dispatcher {
   create(entity_addr_t test_addr,
          SocketPolicy test_policy,
          entity_addr_t test_peer_addr,
-         const TestInterceptor& interceptor) {
+         const TestInterceptor& interceptor,
+         ShardedGates &gates) {
     auto suite = std::make_unique<FailoverSuite>(
         Messenger::create(
           entity_name_t::OSD(TEST_OSD),
           "Test",
           TEST_NONCE,
           true),
-        test_peer_addr, interceptor);
+        test_peer_addr,
+        interceptor,
+        gates);
     return suite->init(test_addr, test_policy
     ).then([suite = std::move(suite)] () mutable {
       return std::move(suite);
@@ -1188,6 +1239,7 @@ class FailoverSuite : public Dispatcher {
  public:
   seastar::future<> connect_peer() {
     logger().info("[Test] connect_peer({})", test_peer_addr);
+    assert(seastar::this_shard_id() == primary_sid);
     auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
     auto result = interceptor.find_result(&*conn);
     ceph_assert(result != nullptr);
@@ -1212,6 +1264,7 @@ class FailoverSuite : public Dispatcher {
   }
 
   seastar::future<> send_peer() {
+    assert(seastar::this_shard_id() == primary_sid);
     if (tracked_conn) {
       logger().info("[Test] send_peer()");
       ceph_assert(!tracked_conn->is_protocol_closed());
@@ -1226,6 +1279,7 @@ class FailoverSuite : public Dispatcher {
 
   seastar::future<> keepalive_peer() {
     logger().info("[Test] keepalive_peer()");
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
     ceph_assert(!tracked_conn->is_protocol_closed());
     return tracked_conn->send_keepalive();
@@ -1233,6 +1287,7 @@ class FailoverSuite : public Dispatcher {
 
   seastar::future<> try_send_peer() {
     logger().info("[Test] try_send_peer()");
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
     ceph_assert(!tracked_conn->is_protocol_closed());
     return send_op(false);
@@ -1240,21 +1295,28 @@ class FailoverSuite : public Dispatcher {
 
   seastar::future<> markdown() {
     logger().info("[Test] markdown() in 100ms ...");
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
     // sleep to propagate potential remaining acks
     return seastar::sleep(100ms
     ).then([this] {
-      tracked_conn->mark_down();
+      return seastar::smp::submit_to(
+          tracked_conn->get_shard_id(), [tracked_conn=tracked_conn] {
+        assert(tracked_conn->get_shard_id() == seastar::this_shard_id());
+        tracked_conn->mark_down();
+      });
     });
   }
 
   seastar::future<> wait_blocked() {
     logger().info("[Test] wait_blocked() ...");
+    assert(seastar::this_shard_id() == primary_sid);
     return interceptor.blocker.wait_blocked();
   }
 
   void unblock() {
     logger().info("[Test] unblock()");
+    assert(seastar::this_shard_id() == primary_sid);
     return interceptor.blocker.unblock();
   }
 
@@ -1277,6 +1339,7 @@ class FailoverSuite : public Dispatcher {
   }
 
   bool is_standby() {
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
     return tracked_conn->is_protocol_standby();
   }
@@ -1417,26 +1480,34 @@ class FailoverTest : public Dispatcher {
     logger().info("\n\n[{}]", name);
     ceph_assert(!test_suite);
     SocketPolicy test_policy_ = to_socket_policy(test_policy);
-    return FailoverSuite::create(
-        test_addr, test_policy_, test_peer_addr, interceptor
-    ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
-      ceph_assert(suite->get_addr() == test_addr);
-      test_suite.swap(suite);
-      return start_peer(peer_policy).then([this, f = std::move(f)] {
-        return f(*test_suite);
-      }).then([this] {
-        test_suite->post_check();
-        logger().info("\n[SUCCESS]");
-      }).handle_exception([this] (auto eptr) {
-        logger().info("\n[FAIL: {}]", eptr);
-        test_suite->dump_results();
-        throw;
-      }).then([this] {
-        return stop_peer();
-      }).then([this] {
-        return test_suite->shutdown().then([this] {
-          test_suite.reset();
+    return ShardedGates::create(
+    ).then([this, test_policy_, peer_policy, interceptor,
+            f=std::move(f)](auto *gates) mutable {
+      return FailoverSuite::create(
+        test_addr, test_policy_, test_peer_addr, interceptor, *gates
+      ).then([this, peer_policy, f = std::move(f)](auto suite) mutable {
+        ceph_assert(suite->get_addr() == test_addr);
+        test_suite.swap(suite);
+        return start_peer(peer_policy
+        ).then([this, f = std::move(f)] {
+          return f(*test_suite);
+        }).then([this] {
+          test_suite->post_check();
+          logger().info("\n[SUCCESS]");
+        }).handle_exception([this](auto eptr) {
+          logger().info("\n[FAIL: {}]", eptr);
+          test_suite->dump_results();
+          throw;
+        }).then([this] {
+          return stop_peer();
+        }).then([this] {
+          return test_suite->shutdown(
+          ).then([this] {
+            test_suite.reset();
+          });
         });
+      }).then([gates] {
+        return gates->close();
       });
     });
   }