]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: Protocol interceptor infrastructure
authorRicardo Dias <rdias@suse.com>
Mon, 18 Feb 2019 10:42:30 +0000 (10:42 +0000)
committerRicardo Dias <rdias@suse.com>
Wed, 20 Feb 2019 13:36:13 +0000 (13:36 +0000)
This interceptor infrastructure allows to control the protocol
steps by the testing code.
The test code can force the protocol to pause in a specific step,
and then it can control whether the protocol continues or if it
fails at that point.

Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/Connection.h
src/msg/Messenger.h
src/msg/async/AsyncConnection.cc
src/msg/async/ProtocolV2.cc

index 44cc5102fe0090643b90abbd33eaf26f95c2421f..7c8d6dc15fa836ba7619bf60f7bc988b30e63692 100644 (file)
@@ -37,6 +37,7 @@
 // abstract Connection, for keeping per-connection state
 
 class Messenger;
+class Interceptor;
 
 struct Connection : public RefCountedObject {
   mutable Mutex lock;
@@ -61,6 +62,8 @@ public:
   EntityName peer_name;
   uint64_t peer_global_id = 0;
 
+  Interceptor *interceptor;
+
   friend class boost::intrusive_ptr<Connection>;
   friend class PipeConnection;
 
index 911d9eea5f7f5341dbe4c35adecb13c89d84834e..432efb5fd7336c4cd05ca60d742acaace2d8c6d9 100644 (file)
@@ -49,6 +49,20 @@ class Timer;
 class AuthClient;
 class AuthServer;
 
+struct Interceptor {
+  std::mutex lock;
+  std::condition_variable cond_var;
+
+  enum ACTION : uint32_t {
+    CONTINUE = 0,
+    FAIL,
+    STOP
+  };
+
+  virtual ~Interceptor() {}
+  virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
+};
+
 class Messenger {
 private:
   std::deque<Dispatcher*> dispatchers;
@@ -75,6 +89,7 @@ protected:
 public:
   AuthClient *auth_client = 0;
   AuthServer *auth_server = 0;
+  Interceptor *interceptor = nullptr;
 
   /**
    * Various Messenger conditional config/type flags to allow
index 92b0df9cfb7632554e8f3a3da1ad2214fd6a0d65..6e4991b4058db1519df595df3f6943aece1deca2 100644 (file)
@@ -123,6 +123,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
     msgr2(m2), state_offset(0),
     worker(w), center(&w->center),read_buffer(nullptr)
 {
+  this->interceptor = m->interceptor;
   read_handler = new C_handle_read(this);
   write_handler = new C_handle_write(this);
   write_callback_handler = new C_handle_write_callback(this);
index 6be5f2021f307d5b1d14d9a12e354e0c84c73891..8e5cad257cecf61456736ad6f079bd2613f21957 100644 (file)
@@ -96,6 +96,17 @@ const int SIGNATURE_BLOCK_SIZE = CEPH_CRYPTO_HMACSHA256_DIGESTSIZE;
 
 #define READB(L, B, C) read(CONTINUATION(C), L, B)
 
+#define INTERCEPT(S) { \
+if(connection->interceptor) { \
+  auto a = connection->interceptor->intercept(connection, (S)); \
+  if (a == Interceptor::ACTION::FAIL) { \
+    return _fault(); \
+  } else if (a == Interceptor::ACTION::STOP) { \
+    stop(); \
+    connection->dispatch_queue->queue_reset(connection); \
+    return nullptr; \
+  }}}
+
 static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
   // create a buffer to read into that matches the data alignment
   unsigned alloc_len = 0;
@@ -1254,6 +1265,8 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) {
   encode((uint16_t)banner_payload.length(), bl, 0);
   bl.claim_append(banner_payload);
 
+  INTERCEPT(state == CONNECTING ? 3 : 4);
+
   return WRITE(bl, "banner", _wait_for_peer_banner);
 }
 
@@ -1299,6 +1312,9 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) {
                                      // temp_buffer size as well
 
   next_payload_len = payload_len;
+
+  INTERCEPT(state == CONNECTING ? 5 : 6);
+
   return READ(next_payload_len, _handle_peer_banner_payload);
 }
 
@@ -1358,6 +1374,9 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) {
   }
 
   HelloFrame hello(this, messenger->get_mytype(), connection->target_addr);
+
+  INTERCEPT(state == CONNECTING ? 7 : 8);
+
   return WRITE(hello.get_buffer(), "hello frame", read_frame);
 }
 
@@ -1556,6 +1575,8 @@ CtPtr ProtocolV2::ready() {
                 << server_cookie << std::dec << " in_seq=" << in_seq
                 << " out_seq=" << out_seq << dendl;
 
+  INTERCEPT(15);
+
   return CONTINUE(read_frame);
 }
 
@@ -1605,6 +1626,8 @@ CtPtr ProtocolV2::handle_message_header(char *buffer, int r) {
     }
   }
 
+  INTERCEPT(16);
+
   // Reset state
   data_buf.clear();
   front.clear();
@@ -1913,6 +1936,8 @@ CtPtr ProtocolV2::handle_message_complete() {
     return _fault();
   }
 
+  INTERCEPT(17);
+
   message->set_byte_throttler(connection->policy.throttler_bytes);
   message->set_message_throttler(connection->policy.throttler_messages);
 
@@ -2083,6 +2108,9 @@ CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) {
 
 CtPtr ProtocolV2::start_client_banner_exchange() {
   ldout(cct, 20) << __func__ << dendl;
+
+  INTERCEPT(1);
+
   state = CONNECTING;
 
   global_seq = messenger->get_global_seq();
@@ -2122,6 +2150,9 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
     connection->dispatch_queue->queue_reset(connection);
     return nullptr;
   }
+
+  INTERCEPT(9);
+
   AuthRequestFrame frame(auth_meta->auth_method, preferred_modes, bl);
   return WRITE(frame.get_buffer(), "auth request", read_frame);
 }
@@ -2279,8 +2310,9 @@ CtPtr ProtocolV2::send_client_ident() {
                 << " flags=" << flags
                 << " cookie=" << client_cookie << std::dec << dendl;
 
-  bufferlist &bl = client_ident.get_buffer();
-  return WRITE(bl, "client ident", read_frame);
+  INTERCEPT(11);
+
+  return WRITE(client_ident.get_buffer(), "client ident", read_frame);
 }
 
 CtPtr ProtocolV2::send_reconnect() {
@@ -2298,6 +2330,9 @@ CtPtr ProtocolV2::send_reconnect() {
                 << server_cookie << std::dec
                 << " gs=" << global_seq << " cs=" << connect_seq
                 << " ms=" << in_seq << dendl;
+
+  INTERCEPT(13);
+
   return WRITE(reconnect.get_buffer(), "reconnect", read_frame);
 }
 
@@ -2441,6 +2476,9 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
 
 CtPtr ProtocolV2::start_server_banner_exchange() {
   ldout(cct, 20) << __func__ << dendl;
+
+  INTERCEPT(2);
+
   state = ACCEPTING;
 
   return _banner_exchange(CONTINUATION(post_server_banner_exchange));
@@ -2521,6 +2559,8 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
     return _fault();
   }
   if (r == 1) {
+    INTERCEPT(10);
+
     AuthDoneFrame auth_done(connection->peer_global_id, auth_meta->con_mode,
                            reply);
     return WRITE(auth_done.get_buffer(), "auth done", read_frame);
@@ -2736,6 +2776,9 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
                   << " cgs=" << reconnect.global_seq()
                   << ", ask client to retry global" << dendl;
     RetryGlobalFrame retry(this, exproto->peer_global_seq);
+
+    INTERCEPT(18);
+
     return WRITE(retry.get_buffer(), "session retry", read_frame);
   }
 
@@ -3044,8 +3087,9 @@ CtPtr ProtocolV2::send_server_ident() {
   connection->dispatch_queue->queue_accept(connection);
   messenger->ms_deliver_handle_fast_accept(connection);
 
-  bufferlist &bl = server_ident.get_buffer();
-  return WRITE(bl, "server ident", server_ready);
+  INTERCEPT(12);
+
+  return WRITE(server_ident.get_buffer(), "server ident", server_ready);
 }
 
 CtPtr ProtocolV2::server_ready() {
@@ -3098,6 +3142,7 @@ CtPtr ProtocolV2::send_reconnect_ok() {
   connection->dispatch_queue->queue_accept(connection);
   messenger->ms_deliver_handle_fast_accept(connection);
 
-  bufferlist &bl = reconnect_ok.get_buffer();
-  return WRITE(bl, "reconnect ok", server_ready);
+  INTERCEPT(14);
+
+  return WRITE(reconnect_ok.get_buffer(), "reconnect ok", server_ready);
 }