#include "Errors.h"
#include "SocketMessenger.h"
-#ifdef UNIT_TESTS_BUILT
-#include "Interceptor.h"
-#endif
-
using namespace ceph::msgr::v2;
using crimson::common::local_conf;
namespace crimson::net {
-#ifdef UNIT_TESTS_BUILT
-// should be consistent to intercept_frame() in FrameAssemblerV2.cc
-void intercept(Breakpoint bp,
- bp_type_t type,
- SocketConnection& conn,
- Interceptor *interceptor,
- Socket *socket) {
- if (interceptor) {
- auto action = interceptor->intercept(
- conn.get_local_shared_foreign_from_this(),
- Breakpoint(bp));
- socket->set_trap(type, action, &interceptor->blocker);
- }
-}
-
-#define INTERCEPT_CUSTOM(bp, type) \
-intercept({bp}, type, conn, \
- conn.interceptor, conn.socket)
-#else
-#define INTERCEPT_CUSTOM(bp, type)
-#endif
-
seastar::future<> ProtocolV2::Timer::backoff(double seconds)
{
logger().warn("{} waiting {} seconds ...", conn, seconds);
CRIMSON_MSGR2_SUPPORTED_FEATURES,
CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
- INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
- return frame_assembler->write_flush(std::move(bl)).then([this] {
- // 2. read peer banner
- unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
- INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true
+ ).then([this, bl=std::move(bl)]() mutable {
+ return frame_assembler->write_flush(std::move(bl));
+ }
+#else
+ return frame_assembler->write_flush(std::move(bl)
+#endif
+ ).then([this] {
+ // 2. read peer banner
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false
+ ).then([this, banner_len] {
return frame_assembler->read_exactly(banner_len);
- }).then([this](auto bptr) {
- // 3. process peer banner and read banner_payload
- unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
- logger().debug("{} RECV({}) banner: \"{}\"",
- conn, bptr.length(),
- std::string(bptr.c_str(), banner_prefix_len));
-
- if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
- if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
- logger().warn("{} peer is using V1 protocol", conn);
- } else {
- logger().warn("{} peer sent bad banner", conn);
- }
- abort_in_fault();
+ });
+#else
+ return frame_assembler->read_exactly(banner_len);
+#endif
+ }).then([this](auto bptr) {
+ // 3. process peer banner and read banner_payload
+ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ logger().debug("{} RECV({}) banner: \"{}\"",
+ conn, bptr.length(),
+ std::string(bptr.c_str(), banner_prefix_len));
+
+ if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+ if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+ logger().warn("{} peer is using V1 protocol", conn);
+ } else {
+ logger().warn("{} peer sent bad banner", conn);
}
+ abort_in_fault();
+ }
- bptr.set_offset(bptr.offset() + banner_prefix_len);
- bptr.set_length(bptr.length() - banner_prefix_len);
- assert(bptr.length() == sizeof(ceph_le16));
-
- uint16_t payload_len;
- bufferlist buf;
- buf.append(std::move(bptr));
- auto ti = buf.cbegin();
- try {
- decode(payload_len, ti);
- } catch (const buffer::error &e) {
- logger().warn("{} decode banner payload len failed", conn);
- abort_in_fault();
- }
- logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
- INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
+ bptr.set_offset(bptr.offset() + banner_prefix_len);
+ bptr.set_length(bptr.length() - banner_prefix_len);
+ assert(bptr.length() == sizeof(ceph_le16));
+
+ uint16_t payload_len;
+ bufferlist buf;
+ buf.append(std::move(bptr));
+ auto ti = buf.cbegin();
+ try {
+ decode(payload_len, ti);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload len failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(
+ custom_bp_t::BANNER_PAYLOAD_READ, false
+ ).then([this, payload_len] {
return frame_assembler->read(payload_len);
- }).then([this, is_connect] (bufferlist bl) {
- // 4. process peer banner_payload and send HelloFrame
- auto p = bl.cbegin();
- uint64_t _peer_supported_features;
- uint64_t _peer_required_features;
- try {
- decode(_peer_supported_features, p);
- decode(_peer_required_features, p);
- } catch (const buffer::error &e) {
- logger().warn("{} decode banner payload failed", conn);
- abort_in_fault();
- }
- logger().debug("{} RECV({}) banner features: supported={} required={}",
- conn, bl.length(),
- _peer_supported_features, _peer_required_features);
-
- // Check feature bit compatibility
- uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
- uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
- if ((required_features & _peer_supported_features) != required_features) {
- logger().error("{} peer does not support all required features"
- " required={} peer_supported={}",
- conn, required_features, _peer_supported_features);
- ABORT_IN_CLOSE(is_connect);
- }
- if ((supported_features & _peer_required_features) != _peer_required_features) {
- logger().error("{} we do not support all peer required features"
- " peer_required={} supported={}",
- conn, _peer_required_features, supported_features);
- ABORT_IN_CLOSE(is_connect);
- }
- peer_supported_features = _peer_supported_features;
- bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- frame_assembler->set_is_rev1(is_rev1);
-
- auto hello = HelloFrame::Encode(messenger.get_mytype(),
- conn.target_addr);
- logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
- conn, ceph_entity_type_name(messenger.get_mytype()),
- conn.target_addr);
- return frame_assembler->write_flush_frame(hello);
- }).then([this] {
- //5. read peer HelloFrame
- return frame_assembler->read_main_preamble();
- }).then([this](auto ret) {
- expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
- return frame_assembler->read_frame_payload();
- }).then([this](auto payload) {
- // 6. process peer HelloFrame
- auto hello = HelloFrame::Decode(payload->back());
- logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
- conn, ceph_entity_type_name(hello.entity_type()),
- hello.peer_addr());
- return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
- std::make_tuple(hello.entity_type(), hello.peer_addr()));
});
+#else
+ return frame_assembler->read(payload_len);
+#endif
+ }).then([this, is_connect] (bufferlist bl) {
+ // 4. process peer banner_payload and send HelloFrame
+ auto p = bl.cbegin();
+ uint64_t _peer_supported_features;
+ uint64_t _peer_required_features;
+ try {
+ decode(_peer_supported_features, p);
+ decode(_peer_required_features, p);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} RECV({}) banner features: supported={} required={}",
+ conn, bl.length(),
+ _peer_supported_features, _peer_required_features);
+
+ // Check feature bit compatibility
+ uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
+ uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+ if ((required_features & _peer_supported_features) != required_features) {
+ logger().error("{} peer does not support all required features"
+ " required={} peer_supported={}",
+ conn, required_features, _peer_supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ if ((supported_features & _peer_required_features) != _peer_required_features) {
+ logger().error("{} we do not support all peer required features"
+ " peer_required={} supported={}",
+ conn, _peer_required_features, supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ peer_supported_features = _peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
+
+ auto hello = HelloFrame::Encode(messenger.get_mytype(),
+ conn.target_addr);
+ logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+ conn, ceph_entity_type_name(messenger.get_mytype()),
+ conn.target_addr);
+ return frame_assembler->write_flush_frame(hello);
+ }).then([this] {
+ //5. read peer HelloFrame
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ // 6. process peer HelloFrame
+ auto hello = HelloFrame::Decode(payload->back());
+ logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+ conn, ceph_entity_type_name(hello.entity_type()),
+ hello.peer_addr());
+ return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+ std::make_tuple(hello.entity_type(), hello.peer_addr()));
+ });
}
// CONNECTING state
ceph_assert_always(!is_socket_valid);
trigger_state(state_t::CONNECTING, io_state_t::delay);
gated_execute("execute_connecting", conn, [this] {
- global_seq = messenger.get_global_seq();
- assert(client_cookie != 0);
- if (!conn.policy.lossy && server_cookie != 0) {
- ++connect_seq;
- logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
- conn, global_seq, connect_seq);
- } else { // conn.policy.lossy || server_cookie == 0
- assert(connect_seq == 0);
- assert(server_cookie == 0);
- logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
- }
- return wait_exit_io().then([this] {
+ global_seq = messenger.get_global_seq();
+ assert(client_cookie != 0);
+ if (!conn.policy.lossy && server_cookie != 0) {
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else { // conn.policy.lossy || server_cookie == 0
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ }
+ return wait_exit_io().then([this] {
#ifdef UNIT_TESTS_BUILT
- // process custom_bp_t::SOCKET_CONNECTING
- // supports CONTINUE/FAULT/BLOCK
- if (conn.interceptor) {
- auto action = conn.interceptor->intercept(
- conn.get_local_shared_foreign_from_this(),
- {custom_bp_t::SOCKET_CONNECTING});
- switch (action) {
- case bp_action_t::CONTINUE:
- return seastar::now();
- case bp_action_t::FAULT:
- logger().info("[Test] got FAULT");
- abort_in_fault();
- case bp_action_t::BLOCK:
- logger().info("[Test] got BLOCK");
- return conn.interceptor->blocker.block();
- default:
- ceph_abort("unexpected action from trap");
- return seastar::now();
- }
- } else {
- return seastar::now();
- }
- }).then([this] {
-#endif
- ceph_assert_always(frame_assembler);
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} before Socket::connect()",
- conn, get_state_name(state));
- abort_protocol();
- }
- return Socket::connect(conn.peer_addr);
- }).then([this](SocketRef _new_socket) {
- logger().debug("{} socket connected", conn);
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} during Socket::connect()",
- conn, get_state_name(state));
- return _new_socket->close().then([sock=std::move(_new_socket)] {
- abort_protocol();
- });
- }
- SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
- if (!has_socket) {
- frame_assembler->set_socket(std::move(new_socket));
- has_socket = true;
- } else {
- gate.dispatch_in_background(
- "replace_socket_connecting",
- conn,
- [this, new_socket=std::move(new_socket)]() mutable {
- return frame_assembler->replace_shutdown_socket(std::move(new_socket));
- }
- );
- }
- is_socket_valid = true;
+ // process custom_bp_t::SOCKET_CONNECTING
+ // supports CONTINUE/FAULT/BLOCK
+ if (!conn.interceptor) {
+ return seastar::now();
+ }
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}}
+ ).then([this](bp_action_t action) {
+ switch (action) {
+ case bp_action_t::CONTINUE:
return seastar::now();
- }).then([this] {
- auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- frame_assembler->reset_handlers();
- frame_assembler->start_recording();
- return banner_exchange(true);
- }).then([this] (auto&& ret) {
- auto [_peer_type, _my_addr_from_peer] = std::move(ret);
- if (conn.get_peer_type() != _peer_type) {
- logger().warn("{} connection peer type does not match what peer advertises {} != {}",
- conn, ceph_entity_type_name(conn.get_peer_type()),
- ceph_entity_type_name(_peer_type));
- ABORT_IN_CLOSE(true);
- }
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} during banner_exchange(), abort",
- conn, get_state_name(state));
- abort_protocol();
- }
- frame_assembler->learn_socket_ephemeral_port_as_connector(
- _my_addr_from_peer.get_port());
- if (unlikely(_my_addr_from_peer.is_legacy())) {
- logger().warn("{} peer sent a legacy address for me: {}",
- conn, _my_addr_from_peer);
- throw std::system_error(
- make_error_code(crimson::net::error::bad_peer_address));
- }
- _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
- messenger.learned_addr(_my_addr_from_peer, conn);
- return client_auth();
- }).then([this] {
- if (server_cookie == 0) {
- ceph_assert(connect_seq == 0);
- return client_connect();
- } else {
- ceph_assert(connect_seq > 0);
- return client_reconnect();
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ abort_in_fault();
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return conn.interceptor->blocker.block();
+ default:
+ ceph_abort("unexpected action from trap");
+ return seastar::now();
+ }
+ });;
+ }).then([this] {
+#endif
+ ceph_assert_always(frame_assembler);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before Socket::connect()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ return Socket::connect(conn.peer_addr);
+ }).then([this](SocketRef _new_socket) {
+ logger().debug("{} socket connected", conn);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during Socket::connect()",
+ conn, get_state_name(state));
+ return _new_socket->close().then([sock=std::move(_new_socket)] {
+ abort_protocol();
+ });
+ }
+ SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
+ if (!has_socket) {
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ } else {
+ gate.dispatch_in_background(
+ "replace_socket_connecting",
+ conn,
+ [this, new_socket=std::move(new_socket)]() mutable {
+ return frame_assembler->replace_shutdown_socket(std::move(new_socket));
}
- }).then([this] (next_step_t next) {
+ );
+ }
+ is_socket_valid = true;
+ return seastar::now();
+ }).then([this] {
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(true);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ if (conn.get_peer_type() != _peer_type) {
+ logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+ conn, ceph_entity_type_name(conn.get_peer_type()),
+ ceph_entity_type_name(_peer_type));
+ ABORT_IN_CLOSE(true);
+ }
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ frame_assembler->learn_socket_ephemeral_port_as_connector(
+ _my_addr_from_peer.get_port());
+ if (unlikely(_my_addr_from_peer.is_legacy())) {
+ logger().warn("{} peer sent a legacy address for me: {}",
+ conn, _my_addr_from_peer);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return client_auth();
+ }).then([this] {
+ if (server_cookie == 0) {
+ ceph_assert(connect_seq == 0);
+ return client_connect();
+ } else {
+ ceph_assert(connect_seq > 0);
+ return client_reconnect();
+ }
+ }).then([this] (next_step_t next) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at the end of execute_connecting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ switch (next) {
+ case next_step_t::ready: {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before dispatch_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_connect()
+ crosscore.prepare_submit();
+ logger().info("{} connected: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "send {} IOHandler::dispatch_connect()",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id(), cc_seq);
+
+ // set io_handler to a new shard
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_connect(
+ cc_seq, new_io_shard, std::move(conn_fref));
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} at the end of execute_connecting()",
+ logger().debug("{} triggered {} after dispatch_connect(), abort",
conn, get_state_name(state));
abort_protocol();
}
- switch (next) {
- case next_step_t::ready: {
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} before dispatch_connect(), abort",
- conn, get_state_name(state));
- abort_protocol();
- }
-
- auto cc_seq = crosscore.prepare_submit();
- // there are 2 hops with dispatch_connect()
- crosscore.prepare_submit();
- logger().info("{} connected: gs={}, pgs={}, cs={}, "
- "client_cookie={}, server_cookie={}, {}, new_sid={}, "
- "send {} IOHandler::dispatch_connect()",
- conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, io_states,
- frame_assembler->get_socket_shard_id(), cc_seq);
-
- // set io_handler to a new shard
- auto new_io_shard = frame_assembler->get_socket_shard_id();
- ConnectionFRef conn_fref = seastar::make_foreign(
- conn.shared_from_this());
- ceph_assert_always(!pr_switch_io_shard.has_value());
- pr_switch_io_shard = seastar::shared_promise<>();
- return seastar::smp::submit_to(
- io_handler.get_shard_id(),
- [this, cc_seq, new_io_shard,
- conn_fref=std::move(conn_fref)]() mutable {
- return io_handler.dispatch_connect(
- cc_seq, new_io_shard, std::move(conn_fref));
- }).then([this, new_io_shard] {
- ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
- pr_switch_io_shard->set_value();
- pr_switch_io_shard = std::nullopt;
- // user can make changes
-
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} after dispatch_connect(), abort",
- conn, get_state_name(state));
- abort_protocol();
- }
- execute_ready();
- });
- }
- case next_step_t::wait: {
- logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
- ceph_assert_always(is_socket_valid);
- frame_assembler->shutdown_socket<true>(&gate);
- is_socket_valid = false;
- execute_wait(true);
- return seastar::now();
- }
- default: {
- ceph_abort("impossible next step");
- }
- }
- }).handle_exception([this](std::exception_ptr eptr) {
- fault(state_t::CONNECTING, "execute_connecting", eptr);
+ execute_ready();
});
+ }
+ case next_step_t::wait: {
+ logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+ ceph_assert_always(is_socket_valid);
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ execute_wait(true);
+ return seastar::now();
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::CONNECTING, "execute_connecting", eptr);
});
+ });
}
// ACCEPTING state
assert(is_socket_valid);
trigger_state(state_t::ACCEPTING, io_state_t::none);
gate.dispatch_in_background("execute_accepting", conn, [this] {
- return seastar::futurize_invoke([this] {
+ return seastar::futurize_invoke([this] {
#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- auto action = conn.interceptor->intercept(
- conn.get_local_shared_foreign_from_this(),
- {custom_bp_t::SOCKET_ACCEPTED});
- switch (action) {
- case bp_action_t::CONTINUE:
- break;
- case bp_action_t::FAULT:
- logger().info("[Test] got FAULT");
- abort_in_fault();
- default:
- ceph_abort("unexpected action from trap");
- }
- }
-#endif
- auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- frame_assembler->reset_handlers();
- frame_assembler->start_recording();
- return banner_exchange(false);
- }).then([this] (auto&& ret) {
- auto [_peer_type, _my_addr_from_peer] = std::move(ret);
- ceph_assert(conn.get_peer_type() == 0);
- conn.set_peer_type(_peer_type);
-
- conn.policy = messenger.get_policy(_peer_type);
- logger().info("{} UPDATE: peer_type={},"
- " policy(lossy={} server={} standby={} resetcheck={})",
- conn, ceph_entity_type_name(_peer_type),
- conn.policy.lossy, conn.policy.server,
- conn.policy.standby, conn.policy.resetcheck);
- if (!messenger.get_myaddr().is_blank_ip() &&
- (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
- messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
- logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
- conn, _my_addr_from_peer, messenger.get_myaddr());
- throw std::system_error(
- make_error_code(crimson::net::error::bad_peer_address));
- }
- messenger.learned_addr(_my_addr_from_peer, conn);
- return server_auth();
- }).then([this] {
- return frame_assembler->read_main_preamble();
- }).then([this](auto ret) {
- switch (ret.tag) {
- case Tag::CLIENT_IDENT:
- return server_connect();
- case Tag::SESSION_RECONNECT:
- return server_reconnect();
- default: {
- unexpected_tag(ret.tag, conn, "post_server_auth");
- return seastar::make_ready_future<next_step_t>(next_step_t::none);
- }
- }
- }).then([this] (next_step_t next) {
- switch (next) {
- case next_step_t::ready:
- assert(state != state_t::ACCEPTING);
- break;
- case next_step_t::wait:
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} at the end of execute_accepting()",
- conn, get_state_name(state));
- abort_protocol();
- }
- logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
- execute_server_wait();
- break;
- default:
- ceph_abort("impossible next step");
- }
- }).handle_exception([this](std::exception_ptr eptr) {
- const char *e_what;
- try {
- std::rethrow_exception(eptr);
- } catch (std::exception &e) {
- e_what = e.what();
- }
- logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
- conn, get_state_name(state), e_what);
- do_close(false);
+ if (conn.interceptor) {
+ // only notify socket accepted
+ gate.dispatch_in_background(
+ "test_intercept_socket_accepted", conn, [this] {
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}}
+ ).then([](bp_action_t action) {
+ ceph_assert(action == bp_action_t::CONTINUE);
+ });
});
+ }
+#endif
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(false);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ ceph_assert(conn.get_peer_type() == 0);
+ conn.set_peer_type(_peer_type);
+
+ conn.policy = messenger.get_policy(_peer_type);
+ logger().info("{} UPDATE: peer_type={},"
+ " policy(lossy={} server={} standby={} resetcheck={})",
+ conn, ceph_entity_type_name(_peer_type),
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ if (!messenger.get_myaddr().is_blank_ip() &&
+ (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
+ logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+ conn, _my_addr_from_peer, messenger.get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return server_auth();
+ }).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::CLIENT_IDENT:
+ return server_connect();
+ case Tag::SESSION_RECONNECT:
+ return server_reconnect();
+ default: {
+ unexpected_tag(ret.tag, conn, "post_server_auth");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready:
+ assert(state != state_t::ACCEPTING);
+ break;
+ case next_step_t::wait:
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} at the end of execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+ execute_server_wait();
+ break;
+ default:
+ ceph_abort("impossible next step");
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), e_what);
+ do_close(false);
});
+ });
}
// CONNECTING or ACCEPTING state