namespace crimson::net {
#ifdef UNIT_TESTS_BUILT
+// should be consistent to intercept_frame() in FrameAssemblerV2.cc
void intercept(Breakpoint bp,
bp_type_t type,
Connection& conn,
intercept({bp}, type, conn, \
conn.interceptor, conn.socket)
-#define INTERCEPT_FRAME(tag, type) \
-intercept({static_cast<Tag>(tag), type}, \
- type, conn, \
- conn.interceptor, conn.socket)
-
#define INTERCEPT_N_RW(bp) \
if (conn.interceptor) { \
auto action = conn.interceptor->intercept(conn, {bp}); \
#else
#define INTERCEPT_CUSTOM(bp, type)
-#define INTERCEPT_FRAME(tag, type)
#define INTERCEPT_N_RW(bp)
#endif
execute_accepting();
}
-// TODO: Frame related implementations, probably to a separate class.
-
-seastar::future<FrameAssemblerV2::read_main_t>
-ProtocolV2::read_main_preamble()
-{
- return frame_assembler.read_main_preamble(
-#ifdef UNIT_TESTS_BUILT
- ).then([this](auto ret) {
- INTERCEPT_FRAME(ret.tag, bp_type_t::READ);
- return ret;
- });
-#else
- );
-#endif
-}
-
-template <class F>
-ceph::bufferlist ProtocolV2::get_buffer(F &tx_frame)
-{
- INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
- return frame_assembler.get_buffer(tx_frame);
-}
-
-template <class F>
-seastar::future<> ProtocolV2::write_flush_frame(F &tx_frame)
-{
- INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
- return frame_assembler.write_flush_frame(tx_frame);
-}
-
void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
{
if (!reentrant && _state == state) {
logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
conn, ceph_entity_type_name(messenger.get_mytype()),
conn.target_addr);
- return write_flush_frame(hello);
+ return frame_assembler.write_flush_frame(hello);
}).then([this] {
//5. read peer HelloFrame
- return read_main_preamble();
+ return frame_assembler.read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::HELLO, ret.tag, conn, __func__);
return frame_assembler.read_frame_payload();
seastar::future<> ProtocolV2::handle_auth_reply()
{
- return read_main_preamble(
+ return frame_assembler.read_main_preamble(
).then([this](auto ret) {
switch (ret.tag) {
case Tag::AUTH_BAD_METHOD:
auto more_reply = AuthRequestMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
conn, reply.length());
- return write_flush_frame(more_reply);
+ return frame_assembler.write_flush_frame(more_reply);
}).then([this] {
return handle_auth_reply();
});
logger().debug("{} WRITE AuthRequestFrame: method={},"
" preferred_modes={}, payload_len={}",
conn, auth_method, preferred_modes, bl.length());
- return write_flush_frame(frame).then([this] {
+ return frame_assembler.write_flush_frame(frame).then([this] {
return handle_auth_reply();
});
} catch (const crimson::auth::error& e) {
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, client_cookie);
- return write_flush_frame(client_ident).then([this] {
- return read_main_preamble();
+ return frame_assembler.write_flush_frame(client_ident).then([this] {
+ return frame_assembler.read_main_preamble();
}).then([this](auto ret) {
switch (ret.tag) {
case Tag::IDENT_MISSING_FEATURES:
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
global_seq, connect_seq, get_in_seq());
- return write_flush_frame(reconnect).then([this] {
- return read_main_preamble();
+ return frame_assembler.write_flush_frame(reconnect).then([this] {
+ return frame_assembler.read_main_preamble();
}).then([this](auto ret) {
switch (ret.tag) {
case Tag::SESSION_RETRY_GLOBAL:
"allowed_methods={}, allowed_modes={})",
conn, auth_meta->auth_method, cpp_strerror(r),
allowed_methods, allowed_modes);
- return write_flush_frame(bad_method).then([this] {
+ return frame_assembler.write_flush_frame(bad_method).then([this] {
return server_auth();
});
}
logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
conn, conn.peer_global_id,
ceph_con_mode_name(auth_meta->con_mode), reply.length());
- return write_flush_frame(auth_done).then([this] {
+ return frame_assembler.write_flush_frame(auth_done).then([this] {
ceph_assert(auth_meta);
frame_assembler.create_session_stream_handlers(*auth_meta, true);
return finish_auth();
auto more = AuthReplyMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
conn, reply.length());
- return write_flush_frame(more).then([this] {
- return read_main_preamble();
+ return frame_assembler.write_flush_frame(more).then([this] {
+ return frame_assembler.read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__);
return frame_assembler.read_frame_payload();
seastar::future<> ProtocolV2::server_auth()
{
- return read_main_preamble(
+ return frame_assembler.read_main_preamble(
).then([this](auto ret) {
expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__);
return frame_assembler.read_frame_payload();
{
auto wait = WaitFrame::Encode();
logger().debug("{} WRITE WaitFrame", conn);
- return write_flush_frame(wait).then([] {
+ return frame_assembler.write_flush_frame(wait).then([] {
return next_step_t::wait;
});
}
auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
conn, feat_missing);
- return write_flush_frame(ident_missing_features).then([] {
+ return frame_assembler.write_flush_frame(ident_missing_features).then([] {
return next_step_t::wait;
});
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::read_reconnect()
{
- return read_main_preamble(
+ return frame_assembler.read_main_preamble(
).then([this](auto ret) {
expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect");
return server_reconnect();
{
auto retry = RetryFrame::Encode(connect_seq);
logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
- return write_flush_frame(retry).then([this] {
+ return frame_assembler.write_flush_frame(retry).then([this] {
return read_reconnect();
});
}
{
auto retry = RetryGlobalFrame::Encode(global_seq);
logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
- return write_flush_frame(retry).then([this] {
+ return frame_assembler.write_flush_frame(retry).then([this] {
return read_reconnect();
});
}
{
auto reset = ResetFrame::Encode(full);
logger().warn("{} WRITE ResetFrame: full={}", conn, full);
- return write_flush_frame(reset).then([this] {
- return read_main_preamble();
+ return frame_assembler.write_flush_frame(reset).then([this] {
+ return frame_assembler.read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
return server_connect();
messenger.learned_addr(_my_addr_from_peer, conn);
return server_auth();
}).then([this] {
- return read_main_preamble();
+ return frame_assembler.read_main_preamble();
}).then([this](auto ret) {
switch (ret.tag) {
case Tag::CLIENT_IDENT:
auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
auto sig_frame = AuthSignatureFrame::Encode(sig);
logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
- return write_flush_frame(sig_frame).then([this] {
- return read_main_preamble();
+ return frame_assembler.write_flush_frame(sig_frame).then([this] {
+ return frame_assembler.read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
return frame_assembler.read_frame_payload();
conn.policy.features_required | msgr2_required,
flags, server_cookie);
- return write_flush_frame(server_ident);
+ return frame_assembler.write_flush_frame(server_ident);
}
// REPLACING state
requeue_out_sent_up_to(new_msg_seq);
auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
- return write_flush_frame(reconnect_ok);
+ return frame_assembler.write_flush_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
assert(conn.get_peer_type() == new_peer_name.type());
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(get_buffer(keepalive_frame));
+ bl.append(frame_assembler.get_buffer(keepalive_frame));
}
if (unlikely(maybe_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
- bl.append(get_buffer(keepalive_ack_frame));
+ bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
}
if (require_ack && num_msgs == 0u) {
auto ack_frame = AckFrame::Encode(get_in_seq());
- bl.append(get_buffer(ack_frame));
+ bl.append(frame_assembler.get_buffer(ack_frame));
}
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
msg->get_payload(), msg->get_middle(), msg->get_data());
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(get_buffer(message));
+ bl.append(frame_assembler.get_buffer(message));
});
return bl;
gate.dispatch_in_background("execute_ready", *this, [this] {
protocol_timer.cancel();
return seastar::keep_doing([this] {
- return read_main_preamble(
+ return frame_assembler.read_main_preamble(
).then([this](auto ret) {
switch (ret.tag) {
case Tag::MESSAGE: {