]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/crimson: implement BLOCK action for breakpoint
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 6 Sep 2019 02:56:41 +0000 (10:56 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 18 Sep 2019 04:24:05 +0000 (12:24 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Interceptor.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/test/crimson/test_messenger.cc

index 7a263448cbed4c4df12484817cce344ac23c00e7..e76f990f098ad9f5d12107f25d76e1ca63c0851c 100644 (file)
@@ -33,6 +33,58 @@ enum class bp_type_t {
   WRITE
 };
 
+enum class bp_action_t {
+  CONTINUE = 0,
+  FAULT,
+  BLOCK
+};
+
+inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) {
+  static const char *const action_names[] = {"CONTINUE",
+                                             "FAULT",
+                                             "BLOCK"};
+  assert(static_cast<size_t>(action) < std::size(action_names));
+  return out << action_names[static_cast<size_t>(action)];
+}
+
+struct socket_trap_t {
+  bp_type_t type;
+  bp_action_t action;
+};
+
+class socket_blocker {
+  std::optional<seastar::promise<>> p_blocked;
+  std::optional<seastar::promise<>> p_unblocked;
+
+ public:
+  seastar::future<> wait_blocked() {
+    ceph_assert(!p_blocked);
+    if (p_unblocked) {
+      return seastar::now();
+    } else {
+      p_blocked = seastar::promise<>();
+      return p_blocked->get_future();
+    }
+  }
+
+  seastar::future<> block() {
+    if (p_blocked) {
+      p_blocked->set_value();
+      p_blocked = std::nullopt;
+    }
+    ceph_assert(!p_unblocked);
+    p_unblocked = seastar::promise<>();
+    return p_unblocked->get_future();
+  }
+
+  void unblock() {
+    ceph_assert(!p_blocked);
+    ceph_assert(p_unblocked);
+    p_unblocked->set_value();
+    p_unblocked = std::nullopt;
+  }
+};
+
 struct tag_bp_t {
   ceph::msgr::v2::Tag tag;
   bp_type_t type;
@@ -93,12 +145,13 @@ inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) {
 }
 
 struct Interceptor {
+  socket_blocker blocker;
   virtual ~Interceptor() {}
   virtual void register_conn(Connection& conn) = 0;
   virtual void register_conn_ready(Connection& conn) = 0;
   virtual void register_conn_closed(Connection& conn) = 0;
   virtual void register_conn_replaced(Connection& conn) = 0;
-  virtual bool intercept(Connection& conn, Breakpoint bp) = 0;
+  virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0;
 };
 
 } // namespace ceph::net
index 7931d1642b7f1015310baa9c1bddfdecfbea90a3..4c62f8182761f936c2c8a9647d54dff3a819deb6 100644 (file)
@@ -128,17 +128,46 @@ inline ostream& operator<<(
 namespace ceph::net {
 
 #ifdef UNIT_TESTS_BUILT
+void intercept(Breakpoint bp, bp_type_t type,
+               SocketConnection& conn, SocketFRef& socket) {
+  if (conn.interceptor) {
+    auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
+    switch (action) {
+      case bp_action_t::CONTINUE:
+       break;
+      case bp_action_t::FAULT:
+       socket->set_trap({type, action});
+       break;
+      case bp_action_t::BLOCK:
+       socket->set_trap({type, action}, &conn.interceptor->blocker);
+       logger().info("{} blocked at {}, waiting for unblock...", conn, bp);
+       break;
+      default:
+       ceph_abort("unexpected action from intercept");
+    }
+  }
+}
+
+#define INTERCEPT_CUSTOM(bp, type)       \
+intercept({bp}, type, conn, socket)
+
+#define INTERCEPT_FRAME(tag, type)       \
+intercept({static_cast<Tag>(tag), type}, \
+          type, conn, socket)
 
-#define INTERCEPT(...)                  \
-if (conn.interceptor) {                 \
-  if (conn.interceptor->intercept(      \
-      conn, Breakpoint(__VA_ARGS__))) { \
-    abort_in_fault();                   \
-  }                                     \
+#define INTERCEPT_N_RW(bp)                               \
+if (conn.interceptor) {                                  \
+  auto action = conn.interceptor->intercept(conn, {bp}); \
+  ceph_assert(action != bp_action_t::BLOCK);             \
+  if (action == bp_action_t::FAULT) {                    \
+    abort_in_fault();                                    \
+  }                                                      \
 }
 
 #else
-#define INTERCEPT(...)
+#define INTERCEPT_CUSTOM(bp, type)
+#define INTERCEPT_FRAME(tag, type)
+#define INTERCEPT_N_RW(bp)
 #endif
 
 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
@@ -317,7 +346,7 @@ seastar::future<Tag> ProtocolV2::read_main_preamble()
         rx_segments_desc.emplace_back(main_preamble.segments[idx]);
       }
 
-      INTERCEPT(static_cast<Tag>(main_preamble.tag), bp_type_t::READ);
+      INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
       return static_cast<Tag>(main_preamble.tag);
     });
 }
@@ -400,7 +429,7 @@ seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
   logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
                  conn, bl.length(), (int)main_preamble->tag,
                  (int)main_preamble->num_segments, main_preamble->crc);
-  INTERCEPT(static_cast<Tag>(main_preamble->tag), bp_type_t::WRITE);
+  INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
   if (flush) {
     return write_flush(std::move(bl));
   } else {
@@ -492,13 +521,13 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
                  conn, bl.length(), len_payload,
                  CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
-  INTERCEPT(custom_bp_t::BANNER_WRITE);
+  INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
   return write_flush(std::move(bl)).then([this] {
       // 2. read peer banner
       unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+      INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
       return read_exactly(banner_len); // or read exactly?
     }).then([this] (auto bl) {
-      INTERCEPT(custom_bp_t::BANNER_READ);
       // 3. process peer banner and read banner_payload
       unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
       logger().debug("{} RECV({}) banner: \"{}\"",
@@ -526,9 +555,9 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
         abort_in_fault();
       }
       logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+      INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
       return read(payload_len);
     }).then([this] (bufferlist bl) {
-      INTERCEPT(custom_bp_t::BANNER_PAYLOAD_READ);
       // 4. process peer banner_payload and send HelloFrame
       auto p = bl.cbegin();
       uint64_t peer_supported_features;
@@ -904,7 +933,7 @@ void ProtocolV2::execute_connecting()
               return sock->close().then([sock = std::move(sock)] {});
             });
           }
-          INTERCEPT(custom_bp_t::SOCKET_CONNECTING);
+          INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
           return Socket::connect(conn.peer_addr);
         }).then([this](SocketFRef sock) {
           logger().debug("{} socket connected", conn);
@@ -1515,7 +1544,7 @@ void ProtocolV2::execute_accepting()
   trigger_state(state_t::ACCEPTING, write_state_t::delay, false);
   seastar::with_gate(pending_dispatch, [this] {
       return seastar::futurize_apply([this] {
-          INTERCEPT(custom_bp_t::SOCKET_ACCEPTED);
+          INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
           enable_recording();
@@ -1794,19 +1823,19 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
     bl.append(keepalive_frame.get_buffer(session_stream_handlers));
-    INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
+    INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
   }
 
   if (unlikely(_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
     bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
-    INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
+    INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
   }
 
   if (require_ack && !num_msgs) {
     auto ack_frame = AckFrame::Encode(conn.in_seq);
     bl.append(ack_frame.get_buffer(session_stream_handlers));
-    INTERCEPT(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
+    INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
   }
 
   std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
@@ -1835,7 +1864,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
     bl.append(message.get_buffer(session_stream_handlers));
-    INTERCEPT(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
+    INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
   });
 
   return bl;
index 763b4e2129c235cfa39e0d708ce3e58a370e94a9..8e00323ea4a1924007ce24e56eb994a37445a79b 100644 (file)
@@ -58,32 +58,42 @@ struct bufferlist_consumer {
 
 seastar::future<bufferlist> Socket::read(size_t bytes)
 {
-  if (bytes == 0) {
-    return seastar::make_ready_future<bufferlist>();
-  }
-  r.buffer.clear();
-  r.remaining = bytes;
-  return in.consume(bufferlist_consumer{r.buffer, r.remaining})
-    .then([this] {
+#ifdef UNIT_TESTS_BUILT
+  return try_trap(bp_type_t::READ).then([bytes, this] {
+#endif
+    if (bytes == 0) {
+      return seastar::make_ready_future<bufferlist>();
+    }
+    r.buffer.clear();
+    r.remaining = bytes;
+    return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
       if (r.remaining) { // throw on short reads
         throw std::system_error(make_error_code(error::read_eof));
       }
       return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
     });
+#ifdef UNIT_TESTS_BUILT
+  });
+#endif
 }
 
 seastar::future<seastar::temporary_buffer<char>>
 Socket::read_exactly(size_t bytes) {
-  if (bytes == 0) {
-    return seastar::make_ready_future<seastar::temporary_buffer<char>>();
-  }
-  return in.read_exactly(bytes)
-    .then([this](auto buf) {
+#ifdef UNIT_TESTS_BUILT
+  return try_trap(bp_type_t::READ).then([bytes, this] {
+#endif
+    if (bytes == 0) {
+      return seastar::make_ready_future<seastar::temporary_buffer<char>>();
+    }
+    return in.read_exactly(bytes).then([this](auto buf) {
       if (buf.empty()) {
         throw std::system_error(make_error_code(error::read_eof));
       }
       return seastar::make_ready_future<tmp_buf>(std::move(buf));
     });
+#ifdef UNIT_TESTS_BUILT
+  });
+#endif
 }
 
 void Socket::shutdown() {
@@ -116,4 +126,23 @@ seastar::future<> Socket::close() {
   });
 }
 
+#ifdef UNIT_TESTS_BUILT
+seastar::future<> Socket::try_trap(bp_type_t type) {
+  if (next_trap && next_trap->type == type) {
+    auto action = next_trap->action;
+    next_trap = std::nullopt;
+    switch (action) {
+     case bp_action_t::FAULT:
+      throw std::system_error(make_error_code(ceph::net::error::negotiation_failure));
+     case bp_action_t::BLOCK:
+      ceph_assert(blocker != nullptr);
+      return blocker->block();
+     default:
+      ceph_abort("unexpected action from trap");
+    }
+  }
+  return seastar::now();
+}
+#endif
+
 } // namespace ceph::net
index 0bc61516138fc8acab0a775a6060a6f2fcbd4b28..1882e54f5e73736864d23c3e541266c747a6d8a5 100644 (file)
 #include "include/buffer.h"
 #include "msg/msg_types.h"
 
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
 namespace ceph::net {
 
 class Socket;
@@ -81,13 +85,25 @@ class Socket
   seastar::future<tmp_buf> read_exactly(size_t bytes);
 
   seastar::future<> write(packet&& buf) {
-    return out.write(std::move(buf));
+#ifdef UNIT_TESTS_BUILT
+    return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+#endif
+      return out.write(std::move(buf));
+#ifdef UNIT_TESTS_BUILT
+    });
+#endif
   }
   seastar::future<> flush() {
     return out.flush();
   }
   seastar::future<> write_flush(packet&& buf) {
-    return out.write(std::move(buf)).then([this] { return out.flush(); });
+#ifdef UNIT_TESTS_BUILT
+    return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+#endif
+      return out.write(std::move(buf)).then([this] { return out.flush(); });
+#ifdef UNIT_TESTS_BUILT
+    });
+#endif
   }
 
   // preemptively disable further reads or writes, can only be shutdown once.
@@ -105,6 +121,20 @@ class Socket
   void force_shutdown_out() {
     socket.shutdown_output();
   }
+
+#ifdef UNIT_TESTS_BUILT
+ private:
+  std::optional<socket_trap_t> next_trap = std::nullopt;
+  socket_blocker* blocker = nullptr;
+  seastar::future<> try_trap(bp_type_t type);
+
+ public:
+  void set_trap(socket_trap_t trap, socket_blocker* blocker_ = nullptr) {
+    ceph_assert(!next_trap);
+    next_trap = trap;
+    blocker = blocker_;
+  }
+#endif
 };
 
 } // namespace ceph::net
index 5a6dbae9ded3dc5ac00f49e8bdbc9c81193c7d19..1bba72861ce47312b75afb50dfb73b5c41d880fa 100644 (file)
@@ -581,6 +581,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
 }
 
 using ceph::msgr::v2::Tag;
+using ceph::net::bp_action_t;
 using ceph::net::bp_type_t;
 using ceph::net::Breakpoint;
 using ceph::net::Connection;
@@ -720,7 +721,7 @@ struct ConnResult {
 using ConnResults = std::vector<ConnResult>;
 
 struct TestInterceptor : public Interceptor {
-  std::map<Breakpoint, std::set<unsigned>> breakpoints;
+  std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
   std::map<Breakpoint, counter_t> breakpoints_counter;
   std::map<ConnectionRef, unsigned> conns;
   ConnResults results;
@@ -738,7 +739,12 @@ struct TestInterceptor : public Interceptor {
 
   void make_fault(Breakpoint bp, unsigned round = 1) {
     assert(round >= 1);
-    breakpoints[bp].insert(round);
+    breakpoints[bp][round] = bp_action_t::FAULT;
+  }
+
+  void make_block(Breakpoint bp, unsigned round = 1) {
+    assert(round >= 1);
+    breakpoints[bp][round] = bp_action_t::BLOCK;
   }
 
   ConnResult* find_result(ConnectionRef conn) {
@@ -809,7 +815,7 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} {}", result->index, conn, result->state);
   }
 
-  bool intercept(Connection& conn, Breakpoint bp) override {
+  bp_action_t intercept(Connection& conn, Breakpoint bp) override {
     ++breakpoints_counter[bp].counter;
 
     auto result = find_result(conn.shared_from_this());
@@ -840,12 +846,12 @@ struct TestInterceptor : public Interceptor {
         logger().info("[{}] {} intercepted {}({}) => {}",
                       result->index, conn, bp,
                       breakpoints_counter[bp].counter, it_cnt->second);
-        return true;
+        return it_cnt->second;
       }
     }
     logger().info("[{}] {} intercepted {}({})",
                   result->index, conn, bp, breakpoints_counter[bp].counter);
-    return false;
+    return bp_action_t::CONTINUE;
   }
 };
 
@@ -1146,7 +1152,7 @@ class FailoverSuite : public Dispatcher {
       if (it == interceptor.breakpoints_counter.end()) {
         throw std::runtime_error(fmt::format("{} was missed", kv.first));
       }
-      auto expected = *std::max_element(kv.second.begin(), kv.second.end());
+      auto expected = kv.second.rbegin()->first;
       if (expected > it->second.counter) {
         throw std::runtime_error(fmt::format(
               "{} only triggered {} times, not the expected {}",
@@ -1202,6 +1208,16 @@ class FailoverSuite : public Dispatcher {
     }
   }
 
+  seastar::future<> wait_blocked() {
+    logger().info("Waiting for blocked...");
+    return interceptor.blocker.wait_blocked();
+  }
+
+  void unblock() {
+    logger().info("Unblock breakpoint");
+    return interceptor.blocker.unblock();
+  }
+
   seastar::future<> wait_replaced(unsigned count) {
     return wait_ready(0, count, false);
   }