]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/crimson: implement different ways to wait in tests
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 6 Sep 2019 02:46:47 +0000 (10:46 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 18 Sep 2019 04:24:05 +0000 (12:24 +0800)
wait_replaced(count): wait for connection replaced;
wait_established(): wait for all pending messages sent and received;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_messenger.cc

index 158cd13ed74d3d95a25bed6757ae487adbbdafe2..5a6dbae9ded3dc5ac00f49e8bdbc9c81193c7d19 100644 (file)
@@ -1037,57 +1037,79 @@ class FailoverSuite : public Dispatcher {
     });
   }
 
-  seastar::future<> wait_ready(unsigned num_conns) {
-    assert(num_conns > 0);
-    if (interceptor.results.size() > num_conns) {
-      throw std::runtime_error(fmt::format(
-            "{} connections, more than expected: {}",
-            interceptor.results.size(), num_conns));
-    }
-
+  seastar::future<> wait_ready(unsigned num_ready_conns,
+                               unsigned num_replaced,
+                               bool wait_received) {
+    unsigned pending_conns = 0;
+    unsigned pending_establish = 0;
+    unsigned replaced_conns = 0;
     for (auto& result : interceptor.results) {
       if (result.conn->is_closed()) {
-        continue;
-      }
-
-      if (result.conn->is_connected()) {
+        if (result.state == conn_state_t::replaced) {
+          ++replaced_conns;
+        }
+      } else if (result.conn->is_connected()) {
         if (tracked_conn != result.conn || tracked_index != result.index) {
           throw std::runtime_error(fmt::format(
                 "The connected connection [{}] {} doesn't"
                 " match the tracked connection [{}] {}",
                 result.index, *result.conn, tracked_index, tracked_conn));
         }
-
-        if (pending_send || pending_peer_receive || pending_receive) {
-          logger().info("Waiting for pending_send={} pending_peer_receive={}"
-                        " pending_receive={} from [{}] {}",
-                        pending_send, pending_peer_receive, pending_receive,
-                        result.index, *result.conn);
-          return interceptor.wait().then([this, num_conns] {
-            return wait_ready(num_conns);
-          });
-        } else {
+        if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
           result.state = conn_state_t::established;
+        } else {
+          ++pending_establish;
         }
       } else {
-        logger().info("Waiting for connection [{}] {} connected/closed",
-                      result.index, *result.conn);
-        return interceptor.wait().then([this, num_conns] {
-          return wait_ready(num_conns);
-        });
+        ++pending_conns;
       }
     }
 
-    if (interceptor.results.size() < num_conns) {
-      logger().info("Waiting for incoming connection, currently {}, expected {}",
-                    interceptor.results.size(), num_conns);
-      return interceptor.wait().then([this, num_conns] {
-        return wait_ready(num_conns);
-      });
+    bool do_wait = false;
+    if (num_ready_conns > 0) {
+      if (interceptor.results.size() > num_ready_conns) {
+        throw std::runtime_error(fmt::format(
+              "{} connections,more than expected: {}",
+              interceptor.results.size(), num_ready_conns));
+      } else if (interceptor.results.size() < num_ready_conns || pending_conns > 0) {
+        logger().info("Waiting for connections, currently {} out of {},"
+                      " pending {} ready ...",
+                      interceptor.results.size(), num_ready_conns, pending_conns);
+        do_wait = true;
+      }
+    }
+    if (wait_received &&
+        (pending_send || pending_peer_receive || pending_receive)) {
+      if (pending_conns || pending_establish) {
+        logger().info("Waiting for pending_send={}, pending_peer_receive={},"
+                      " pending_receive={}, with {} pending connections ...",
+                      pending_send, pending_peer_receive, pending_receive,
+                      pending_conns + pending_establish);
+        do_wait = true;
+      }
+    }
+    if (num_replaced > 0) {
+      if (replaced_conns > num_replaced) {
+        throw std::runtime_error(fmt::format(
+            "{} replaced connections, more than expected: {}",
+            replaced_conns, num_replaced));
+      }
+      if (replaced_conns < num_replaced) {
+        logger().info("Waiting for {} replaced connections, currently {} ...",
+                      num_replaced, replaced_conns);
+        do_wait = true;
+      }
     }
 
-    logger().debug("Wait done!");
-    return seastar::now();
+    if (do_wait) {
+      return interceptor.wait(
+      ).then([this, num_ready_conns, num_replaced, wait_received] {
+        return wait_ready(num_ready_conns, num_replaced, wait_received);
+      });
+    } else {
+      logger().info("Wait done!");
+      return seastar::now();
+    }
   }
 
  // called by FailoverTest
@@ -1180,9 +1202,17 @@ class FailoverSuite : public Dispatcher {
     }
   }
 
+  seastar::future<> wait_replaced(unsigned count) {
+    return wait_ready(0, count, false);
+  }
+
+  seastar::future<> wait_established() {
+    return wait_ready(0, 0, true);
+  }
+
   seastar::future<std::reference_wrapper<ConnResults>>
-  wait_results(unsigned num_conns) {
-    return wait_ready(num_conns).then([this] {
+  wait_results(unsigned count) {
+    return wait_ready(count, 0, true).then([this] {
       return std::reference_wrapper<ConnResults>(interceptor.results);
     });
   }