#define READB(L, B, C) read(CONTINUATION(C), L, B)
+#define INTERCEPT(S) { \
+if(connection->interceptor) { \
+ auto a = connection->interceptor->intercept(connection, (S)); \
+ if (a == Interceptor::ACTION::FAIL) { \
+ return _fault(); \
+ } else if (a == Interceptor::ACTION::STOP) { \
+ stop(); \
+ connection->dispatch_queue->queue_reset(connection); \
+ return nullptr; \
+ }}}
+
static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
// create a buffer to read into that matches the data alignment
unsigned alloc_len = 0;
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);
+ INTERCEPT(state == CONNECTING ? 3 : 4);
+
return WRITE(bl, "banner", _wait_for_peer_banner);
}
// temp_buffer size as well
next_payload_len = payload_len;
+
+ INTERCEPT(state == CONNECTING ? 5 : 6);
+
return READ(next_payload_len, _handle_peer_banner_payload);
}
}
HelloFrame hello(this, messenger->get_mytype(), connection->target_addr);
+
+ INTERCEPT(state == CONNECTING ? 7 : 8);
+
return WRITE(hello.get_buffer(), "hello frame", read_frame);
}
<< server_cookie << std::dec << " in_seq=" << in_seq
<< " out_seq=" << out_seq << dendl;
+ INTERCEPT(15);
+
return CONTINUE(read_frame);
}
}
}
+ INTERCEPT(16);
+
// Reset state
data_buf.clear();
front.clear();
return _fault();
}
+ INTERCEPT(17);
+
message->set_byte_throttler(connection->policy.throttler_bytes);
message->set_message_throttler(connection->policy.throttler_messages);
CtPtr ProtocolV2::start_client_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;
+
+ INTERCEPT(1);
+
state = CONNECTING;
global_seq = messenger->get_global_seq();
connection->dispatch_queue->queue_reset(connection);
return nullptr;
}
+
+ INTERCEPT(9);
+
AuthRequestFrame frame(auth_meta->auth_method, preferred_modes, bl);
return WRITE(frame.get_buffer(), "auth request", read_frame);
}
<< " flags=" << flags
<< " cookie=" << client_cookie << std::dec << dendl;
- bufferlist &bl = client_ident.get_buffer();
- return WRITE(bl, "client ident", read_frame);
+ INTERCEPT(11);
+
+ return WRITE(client_ident.get_buffer(), "client ident", read_frame);
}
CtPtr ProtocolV2::send_reconnect() {
<< server_cookie << std::dec
<< " gs=" << global_seq << " cs=" << connect_seq
<< " ms=" << in_seq << dendl;
+
+ INTERCEPT(13);
+
return WRITE(reconnect.get_buffer(), "reconnect", read_frame);
}
CtPtr ProtocolV2::start_server_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;
+
+ INTERCEPT(2);
+
state = ACCEPTING;
return _banner_exchange(CONTINUATION(post_server_banner_exchange));
return _fault();
}
if (r == 1) {
+ INTERCEPT(10);
+
AuthDoneFrame auth_done(connection->peer_global_id, auth_meta->con_mode,
reply);
return WRITE(auth_done.get_buffer(), "auth done", read_frame);
<< " cgs=" << reconnect.global_seq()
<< ", ask client to retry global" << dendl;
RetryGlobalFrame retry(this, exproto->peer_global_seq);
+
+ INTERCEPT(18);
+
return WRITE(retry.get_buffer(), "session retry", read_frame);
}
connection->dispatch_queue->queue_accept(connection);
messenger->ms_deliver_handle_fast_accept(connection);
- bufferlist &bl = server_ident.get_buffer();
- return WRITE(bl, "server ident", server_ready);
+ INTERCEPT(12);
+
+ return WRITE(server_ident.get_buffer(), "server ident", server_ready);
}
CtPtr ProtocolV2::server_ready() {
connection->dispatch_queue->queue_accept(connection);
messenger->ms_deliver_handle_fast_accept(connection);
- bufferlist &bl = reconnect_ok.get_buffer();
- return WRITE(bl, "reconnect ok", server_ready);
+ INTERCEPT(14);
+
+ return WRITE(reconnect_ok.get_buffer(), "reconnect ok", server_ready);
}