]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/crimson: implement STALL action for breakpoint
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 11 Sep 2019 01:39:27 +0000 (09:39 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 18 Sep 2019 04:32:31 +0000 (12:32 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Interceptor.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/test/crimson/test_messenger.cc

index e76f990f098ad9f5d12107f25d76e1ca63c0851c..5e61e1c0261fc2c89cdf96f3f326649c219ce4d5 100644 (file)
@@ -36,22 +36,19 @@ enum class bp_type_t {
 enum class bp_action_t {
   CONTINUE = 0,
   FAULT,
-  BLOCK
+  BLOCK,
+  STALL
 };
 
 inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) {
   static const char *const action_names[] = {"CONTINUE",
                                              "FAULT",
-                                             "BLOCK"};
+                                             "BLOCK",
+                                             "STALL"};
   assert(static_cast<size_t>(action) < std::size(action_names));
   return out << action_names[static_cast<size_t>(action)];
 }
 
-struct socket_trap_t {
-  bp_type_t type;
-  bp_action_t action;
-};
-
 class socket_blocker {
   std::optional<seastar::promise<>> p_blocked;
   std::optional<seastar::promise<>> p_unblocked;
index 2509271ef39c92a9567292ba422a233717deb298..b936342b5283179ccbde5a6559c4beb80cab3cfb 100644 (file)
@@ -132,19 +132,7 @@ void intercept(Breakpoint bp, bp_type_t type,
                SocketConnection& conn, SocketFRef& socket) {
   if (conn.interceptor) {
     auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
-    switch (action) {
-      case bp_action_t::CONTINUE:
-       break;
-      case bp_action_t::FAULT:
-       socket->set_trap({type, action});
-       break;
-      case bp_action_t::BLOCK:
-       socket->set_trap({type, action}, &conn.interceptor->blocker);
-       logger().info("{} blocked at {}, waiting for unblock...", conn, bp);
-       break;
-      default:
-       ceph_abort("unexpected action from intercept");
-    }
+    socket->set_trap(type, action, &conn.interceptor->blocker);
   }
 }
 
index 8e00323ea4a1924007ce24e56eb994a37445a79b..4f7c09b94ef6c71c8baa0c3396d2339051748802 100644 (file)
@@ -59,7 +59,7 @@ struct bufferlist_consumer {
 seastar::future<bufferlist> Socket::read(size_t bytes)
 {
 #ifdef UNIT_TESTS_BUILT
-  return try_trap(bp_type_t::READ).then([bytes, this] {
+  return try_trap_pre(next_trap_read).then([bytes, this] {
 #endif
     if (bytes == 0) {
       return seastar::make_ready_future<bufferlist>();
@@ -73,6 +73,11 @@ seastar::future<bufferlist> Socket::read(size_t bytes)
       return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
     });
 #ifdef UNIT_TESTS_BUILT
+  }).then([this] (auto buf) {
+    return try_trap_post(next_trap_read
+    ).then([buf = std::move(buf)] () mutable {
+      return std::move(buf);
+    });
   });
 #endif
 }
@@ -80,7 +85,7 @@ seastar::future<bufferlist> Socket::read(size_t bytes)
 seastar::future<seastar::temporary_buffer<char>>
 Socket::read_exactly(size_t bytes) {
 #ifdef UNIT_TESTS_BUILT
-  return try_trap(bp_type_t::READ).then([bytes, this] {
+  return try_trap_pre(next_trap_read).then([bytes, this] {
 #endif
     if (bytes == 0) {
       return seastar::make_ready_future<seastar::temporary_buffer<char>>();
@@ -92,6 +97,11 @@ Socket::read_exactly(size_t bytes) {
       return seastar::make_ready_future<tmp_buf>(std::move(buf));
     });
 #ifdef UNIT_TESTS_BUILT
+  }).then([this] (auto buf) {
+    return try_trap_post(next_trap_read
+    ).then([buf = std::move(buf)] () mutable {
+      return std::move(buf);
+    });
   });
 #endif
 }
@@ -127,19 +137,36 @@ seastar::future<> Socket::close() {
 }
 
 #ifdef UNIT_TESTS_BUILT
-seastar::future<> Socket::try_trap(bp_type_t type) {
-  if (next_trap && next_trap->type == type) {
-    auto action = next_trap->action;
-    next_trap = std::nullopt;
-    switch (action) {
-     case bp_action_t::FAULT:
-      throw std::system_error(make_error_code(ceph::net::error::negotiation_failure));
-     case bp_action_t::BLOCK:
-      ceph_assert(blocker != nullptr);
-      return blocker->block();
-     default:
-      ceph_abort("unexpected action from trap");
-    }
+seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
+  auto action = trap;
+  trap = bp_action_t::CONTINUE;
+  switch (action) {
+   case bp_action_t::CONTINUE:
+    break;
+   case bp_action_t::FAULT:
+    throw std::system_error(make_error_code(ceph::net::error::negotiation_failure));
+   case bp_action_t::BLOCK:
+    return blocker->block();
+   case bp_action_t::STALL:
+    trap = action;
+    break;
+   default:
+    ceph_abort("unexpected action from trap");
+  }
+  return seastar::now();
+}
+
+seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
+  auto action = trap;
+  trap = bp_action_t::CONTINUE;
+  switch (action) {
+   case bp_action_t::CONTINUE:
+    break;
+   case bp_action_t::STALL:
+    shutdown();
+    return blocker->block();
+   default:
+    ceph_abort("unexpected action from trap");
   }
   return seastar::now();
 }
index 1882e54f5e73736864d23c3e541266c747a6d8a5..819a842a20e60389cfd6ccd7ce8f5130c55439e4 100644 (file)
@@ -86,10 +86,12 @@ class Socket
 
   seastar::future<> write(packet&& buf) {
 #ifdef UNIT_TESTS_BUILT
-    return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+    return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
 #endif
       return out.write(std::move(buf));
 #ifdef UNIT_TESTS_BUILT
+    }).then([this] {
+      return try_trap_post(next_trap_write);
     });
 #endif
   }
@@ -98,10 +100,12 @@ class Socket
   }
   seastar::future<> write_flush(packet&& buf) {
 #ifdef UNIT_TESTS_BUILT
-    return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+    return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
 #endif
       return out.write(std::move(buf)).then([this] { return out.flush(); });
 #ifdef UNIT_TESTS_BUILT
+    }).then([this] {
+      return try_trap_post(next_trap_write);
     });
 #endif
   }
@@ -124,15 +128,22 @@ class Socket
 
 #ifdef UNIT_TESTS_BUILT
  private:
-  std::optional<socket_trap_t> next_trap = std::nullopt;
+  bp_action_t next_trap_read = bp_action_t::CONTINUE;
+  bp_action_t next_trap_write = bp_action_t::CONTINUE;
   socket_blocker* blocker = nullptr;
-  seastar::future<> try_trap(bp_type_t type);
+  seastar::future<> try_trap_pre(bp_action_t& trap);
+  seastar::future<> try_trap_post(bp_action_t& trap);
 
  public:
-  void set_trap(socket_trap_t trap, socket_blocker* blocker_ = nullptr) {
-    ceph_assert(!next_trap);
-    next_trap = trap;
+  void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
     blocker = blocker_;
+    if (type == bp_type_t::READ) {
+      ceph_assert(next_trap_read == bp_action_t::CONTINUE);
+      next_trap_read = action;
+    } else {
+      ceph_assert(next_trap_write == bp_action_t::CONTINUE);
+      next_trap_write = action;
+    }
   }
 #endif
 };
index fd84e66dc4ed9fe825ddbf970b70776da282bf47..ebade08c907809f8bd0de70d87054da101d93934 100644 (file)
@@ -747,6 +747,11 @@ struct TestInterceptor : public Interceptor {
     breakpoints[bp][round] = bp_action_t::BLOCK;
   }
 
+  void make_stall(Breakpoint bp, unsigned round = 1) {
+    assert(round >= 1);
+    breakpoints[bp][round] = bp_action_t::STALL;
+  }
+
   ConnResult* find_result(ConnectionRef conn) {
     auto it = conns.find(conn);
     if (it == conns.end()) {