From 71ad12d538244f80142197309f0235c330e85a45 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 18 Feb 2019 10:42:30 +0000 Subject: [PATCH] msg/async: Protocol interceptor infrastructure This interceptor infrastructure allows to control the protocol steps by the testing code. The test code can force the protocol to pause in a specific step, and then it can control whether the protocol continues or if it fails at that point. Signed-off-by: Ricardo Dias --- src/msg/Connection.h | 3 ++ src/msg/Messenger.h | 15 +++++++++ src/msg/async/AsyncConnection.cc | 1 + src/msg/async/ProtocolV2.cc | 57 ++++++++++++++++++++++++++++---- 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/src/msg/Connection.h b/src/msg/Connection.h index 44cc5102fe0..7c8d6dc15fa 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -37,6 +37,7 @@ // abstract Connection, for keeping per-connection state class Messenger; +class Interceptor; struct Connection : public RefCountedObject { mutable Mutex lock; @@ -61,6 +62,8 @@ public: EntityName peer_name; uint64_t peer_global_id = 0; + Interceptor *interceptor; + friend class boost::intrusive_ptr; friend class PipeConnection; diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 911d9eea5f7..432efb5fd73 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -49,6 +49,20 @@ class Timer; class AuthClient; class AuthServer; +struct Interceptor { + std::mutex lock; + std::condition_variable cond_var; + + enum ACTION : uint32_t { + CONTINUE = 0, + FAIL, + STOP + }; + + virtual ~Interceptor() {} + virtual ACTION intercept(Connection *conn, uint32_t step) = 0; +}; + class Messenger { private: std::deque dispatchers; @@ -75,6 +89,7 @@ protected: public: AuthClient *auth_client = 0; AuthServer *auth_server = 0; + Interceptor *interceptor = nullptr; /** * Various Messenger conditional config/type flags to allow diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 92b0df9cfb7..6e4991b4058 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -123,6 +123,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu msgr2(m2), state_offset(0), worker(w), center(&w->center),read_buffer(nullptr) { + this->interceptor = m->interceptor; read_handler = new C_handle_read(this); write_handler = new C_handle_write(this); write_callback_handler = new C_handle_write_callback(this); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 6be5f2021f3..8e5cad257ce 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -96,6 +96,17 @@ const int SIGNATURE_BLOCK_SIZE = CEPH_CRYPTO_HMACSHA256_DIGESTSIZE; #define READB(L, B, C) read(CONTINUATION(C), L, B) +#define INTERCEPT(S) { \ +if(connection->interceptor) { \ + auto a = connection->interceptor->intercept(connection, (S)); \ + if (a == Interceptor::ACTION::FAIL) { \ + return _fault(); \ + } else if (a == Interceptor::ACTION::STOP) { \ + stop(); \ + connection->dispatch_queue->queue_reset(connection); \ + return nullptr; \ + }}} + static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment unsigned alloc_len = 0; @@ -1254,6 +1265,8 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) { encode((uint16_t)banner_payload.length(), bl, 0); bl.claim_append(banner_payload); + INTERCEPT(state == CONNECTING ? 3 : 4); + return WRITE(bl, "banner", _wait_for_peer_banner); } @@ -1299,6 +1312,9 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) { // temp_buffer size as well next_payload_len = payload_len; + + INTERCEPT(state == CONNECTING ? 5 : 6); + return READ(next_payload_len, _handle_peer_banner_payload); } @@ -1358,6 +1374,9 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) { } HelloFrame hello(this, messenger->get_mytype(), connection->target_addr); + + INTERCEPT(state == CONNECTING ? 7 : 8); + return WRITE(hello.get_buffer(), "hello frame", read_frame); } @@ -1556,6 +1575,8 @@ CtPtr ProtocolV2::ready() { << server_cookie << std::dec << " in_seq=" << in_seq << " out_seq=" << out_seq << dendl; + INTERCEPT(15); + return CONTINUE(read_frame); } @@ -1605,6 +1626,8 @@ CtPtr ProtocolV2::handle_message_header(char *buffer, int r) { } } + INTERCEPT(16); + // Reset state data_buf.clear(); front.clear(); @@ -1913,6 +1936,8 @@ CtPtr ProtocolV2::handle_message_complete() { return _fault(); } + INTERCEPT(17); + message->set_byte_throttler(connection->policy.throttler_bytes); message->set_message_throttler(connection->policy.throttler_messages); @@ -2083,6 +2108,9 @@ CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) { CtPtr ProtocolV2::start_client_banner_exchange() { ldout(cct, 20) << __func__ << dendl; + + INTERCEPT(1); + state = CONNECTING; global_seq = messenger->get_global_seq(); @@ -2122,6 +2150,9 @@ CtPtr ProtocolV2::send_auth_request(std::vector &allowed_methods) { connection->dispatch_queue->queue_reset(connection); return nullptr; } + + INTERCEPT(9); + AuthRequestFrame frame(auth_meta->auth_method, preferred_modes, bl); return WRITE(frame.get_buffer(), "auth request", read_frame); } @@ -2279,8 +2310,9 @@ CtPtr ProtocolV2::send_client_ident() { << " flags=" << flags << " cookie=" << client_cookie << std::dec << dendl; - bufferlist &bl = client_ident.get_buffer(); - return WRITE(bl, "client ident", read_frame); + INTERCEPT(11); + + return WRITE(client_ident.get_buffer(), "client ident", read_frame); } CtPtr ProtocolV2::send_reconnect() { @@ -2298,6 +2330,9 @@ CtPtr ProtocolV2::send_reconnect() { << server_cookie << std::dec << " gs=" << global_seq << " cs=" << connect_seq << " ms=" << in_seq << dendl; + + INTERCEPT(13); + return WRITE(reconnect.get_buffer(), "reconnect", read_frame); } @@ -2441,6 +2476,9 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { CtPtr ProtocolV2::start_server_banner_exchange() { ldout(cct, 20) << __func__ << dendl; + + INTERCEPT(2); + state = ACCEPTING; return _banner_exchange(CONTINUATION(post_server_banner_exchange)); @@ -2521,6 +2559,8 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) return _fault(); } if (r == 1) { + INTERCEPT(10); + AuthDoneFrame auth_done(connection->peer_global_id, auth_meta->con_mode, reply); return WRITE(auth_done.get_buffer(), "auth done", read_frame); @@ -2736,6 +2776,9 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { << " cgs=" << reconnect.global_seq() << ", ask client to retry global" << dendl; RetryGlobalFrame retry(this, exproto->peer_global_seq); + + INTERCEPT(18); + return WRITE(retry.get_buffer(), "session retry", read_frame); } @@ -3044,8 +3087,9 @@ CtPtr ProtocolV2::send_server_ident() { connection->dispatch_queue->queue_accept(connection); messenger->ms_deliver_handle_fast_accept(connection); - bufferlist &bl = server_ident.get_buffer(); - return WRITE(bl, "server ident", server_ready); + INTERCEPT(12); + + return WRITE(server_ident.get_buffer(), "server ident", server_ready); } CtPtr ProtocolV2::server_ready() { @@ -3098,6 +3142,7 @@ CtPtr ProtocolV2::send_reconnect_ok() { connection->dispatch_queue->queue_accept(connection); messenger->ms_deliver_handle_fast_accept(connection); - bufferlist &bl = reconnect_ok.get_buffer(); - return WRITE(bl, "reconnect ok", server_ready); + INTERCEPT(14); + + return WRITE(reconnect_ok.get_buffer(), "reconnect ok", server_ready); } -- 2.39.5