From: Yingxin Cheng Date: Thu, 11 Jul 2019 12:38:25 +0000 (+0800) Subject: crimson/net: clean up loggings in v2 protocol X-Git-Tag: v15.1.0~2161^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b86632ed1eb7a1167167b8f96332c5a53bf4d944;p=ceph.git crimson/net: clean up loggings in v2 protocol Defind log levels in V2 Protocol: * error level, something error that cause connection to terminate: - fatal errors; - bugs; * warn level: something unusual that identifies connection fault or replacement: - unstable network; - incompatible peer; - auth failure; - connection race; - connection reset; * info level, something very important to show connection lifecycle, which doesn't happen very frequently; * debug level, important logs for debugging, including: - all the messages sent/received (-->/<==); - all the frames exchanged (WRITE/GOT); - important fields updated (UPDATE); - connection state transitions (TRIGGER); * trace level, trivial logs showing: - the exact bytes being sent/received (SEND/RECV(bytes)); - detailed information of sub-frames; - integrity checks; - etc. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 47216d096121..e7b529ce49a7 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -27,6 +27,29 @@ using namespace ceph::msgr::v2; namespace { +// TODO: apply the same logging policy to Protocol V1 +// Log levels in V2 Protocol: +// * error level, something error that cause connection to terminate: +// - fatal errors; +// - bugs; +// * warn level: something unusual that identifies connection fault or replacement: +// - unstable network; +// - incompatible peer; +// - auth failure; +// - connection race; +// - connection reset; +// * info level, something very important to show connection lifecycle, +// which doesn't happen very frequently; +// * debug level, important logs for debugging, including: +// - all the messages sent/received (-->/<==); +// - all the frames exchanged (WRITE/GOT); +// - important fields updated (UPDATE); +// - connection state transitions (TRIGGER); +// * trace level, trivial logs showing: +// - the exact bytes being sent/received (SEND/RECV(bytes)); +// - detailed information of sub-frames; +// - integrity checks; +// - etc. seastar::logger& logger() { return ceph::get_logger(ceph_subsys_ms); } @@ -44,10 +67,10 @@ inline void expect_tag(const Tag& expected, ceph::net::SocketConnection& conn, const char *where) { if (actual != expected) { - logger().error("{} {} received wrong tag: {}, expected {}", - conn, where, - static_cast(actual), - static_cast(expected)); + logger().warn("{} {} received wrong tag: {}, expected {}", + conn, where, + static_cast(actual), + static_cast(expected)); abort_in_fault(); } } @@ -55,8 +78,8 @@ inline void expect_tag(const Tag& expected, inline void unexpected_tag(const Tag& unexpected, ceph::net::SocketConnection& conn, const char *where) { - logger().error("{} {} received unexpected tag: {}", - conn, where, static_cast(unexpected)); + logger().warn("{} {} received unexpected tag: {}", + conn, where, static_cast(unexpected)); abort_in_fault(); } @@ -110,6 +133,10 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, conn.target_addr = _peer_addr; conn.set_peer_type(_peer_type); conn.policy = messenger.get_policy(_peer_type); + logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}," + " policy(lossy={}, server={}, standby={}, resetcheck={})", + conn, _peer_addr, ceph_entity_type_name(_peer_type), conn.policy.lossy, + conn.policy.server, conn.policy.standby, conn.policy.resetcheck); messenger.register_conn( seastar::static_pointer_cast(conn.shared_from_this())); execute_connecting(); @@ -122,6 +149,7 @@ void ProtocolV2::start_accept(SocketFRef&& sock, ceph_assert(!socket); conn.target_addr = _peer_addr; socket = std::move(sock); + logger().info("{} ProtocolV2::start_accept(): peer_addr={}", conn, _peer_addr); messenger.accept_conn( seastar::static_pointer_cast(conn.shared_from_this())); execute_accepting(); @@ -206,28 +234,30 @@ seastar::future ProtocolV2::read_main_preamble() // everything through ::Decode is unnecessary. const auto& main_preamble = \ *reinterpret_cast(bl.get()); + logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}", + conn, bl.size(), (int)main_preamble.tag, + (int)main_preamble.num_segments, main_preamble.crc); // verify preamble's CRC before any further processing const auto rx_crc = ceph_crc32c(0, reinterpret_cast(&main_preamble), sizeof(main_preamble) - sizeof(main_preamble.crc)); if (rx_crc != main_preamble.crc) { - logger().error("{} crc mismatch for main preamble rx_crc={} tx_crc={}", - conn, rx_crc, main_preamble.crc); + logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}", + conn, rx_crc, main_preamble.crc); abort_in_fault(); } - logger().trace("{} read main preamble: tag={}, len={}", conn, (int)main_preamble.tag, bl.size()); // currently we do support between 1 and MAX_NUM_SEGMENTS segments if (main_preamble.num_segments < 1 || main_preamble.num_segments > MAX_NUM_SEGMENTS) { - logger().error("{} unsupported num_segments={}", - conn, main_preamble.num_segments); + logger().warn("{} unsupported num_segments={}", + conn, main_preamble.num_segments); abort_in_fault(); } if (main_preamble.num_segments > MAX_NUM_SEGMENTS) { - logger().error("{} num_segments too much: {}", - conn, main_preamble.num_segments); + logger().warn("{} num_segments too much: {}", + conn, main_preamble.num_segments); abort_in_fault(); } @@ -235,7 +265,7 @@ seastar::future ProtocolV2::read_main_preamble() rx_segments_data.clear(); for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) { - logger().trace("{} got new segment: len={} align={}", + logger().trace("{} GOT frame segment: len={} align={}", conn, main_preamble.segments[idx].length, main_preamble.segments[idx].alignment); rx_segments_desc.emplace_back(main_preamble.segments[idx]); @@ -263,10 +293,10 @@ seastar::future<> ProtocolV2::read_frame_payload() // TODO: create aligned and contiguous buffer from socket return read_exactly(cur_rx_desc.length) .then([this] (auto tmp_bl) { + logger().trace("{} RECV({}) frame segment[{}]", + conn, tmp_bl.size(), rx_segments_data.size()); bufferlist data; data.append(buffer::create(std::move(tmp_bl))); - logger().trace("{} read frame segment[{}], length={}", - conn, rx_segments_data.size(), data.length()); if (session_stream_handlers.rx) { // TODO ceph_assert(false); @@ -279,7 +309,7 @@ seastar::future<> ProtocolV2::read_frame_payload() ceph_assert(!session_stream_handlers.rx); return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE); }).then([this] (auto bl) { - logger().trace("{} read frame epilogue length={}", conn, bl.size()); + logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); __u8 late_flags; if (session_stream_handlers.rx) { @@ -291,9 +321,9 @@ seastar::future<> ProtocolV2::read_frame_payload() const __u32 expected_crc = epilogue.crc_values[idx]; const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1); if (expected_crc != calculated_crc) { - logger().error("{} message integrity check failed at index {}:" - " expected_crc={} calculated_crc={}", - conn, (unsigned int)idx, expected_crc, calculated_crc); + logger().warn("{} message integrity check failed at index {}:" + " expected_crc={} calculated_crc={}", + conn, (unsigned int)idx, expected_crc, calculated_crc); abort_in_fault(); } else { logger().trace("{} message integrity check success at index {}: crc={}", @@ -302,6 +332,8 @@ seastar::future<> ProtocolV2::read_frame_payload() } late_flags = epilogue.late_flags; } + logger().trace("{} GOT frame epilogue: late_flags={}", + conn, (unsigned)late_flags); // we do have a mechanism that allows transmitter to start sending message // and abort after putting entire data field on wire. This will be used by @@ -317,8 +349,10 @@ template seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) { auto bl = frame.get_buffer(session_stream_handlers); - logger().trace("{} write frame: tag={}, len={}", conn, - static_cast(frame.tag), bl.length()); + const auto main_preamble = reinterpret_cast(bl.front().c_str()); + logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", + conn, bl.length(), (int)main_preamble->tag, + (int)main_preamble->num_segments, main_preamble->crc); if (flush) { return write_flush(std::move(bl)); } else { @@ -333,7 +367,7 @@ void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool conn, get_state_name(state)); ceph_assert(false); } - logger().trace("{} trigger {}, was {}", + logger().debug("{} TRIGGER {}, was {}", conn, get_state_name(_state), get_state_name(state)); state = _state; set_write_state(_write_state); @@ -399,8 +433,14 @@ seastar::future ProtocolV2::banner_exchange() bufferlist bl; bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); - encode((uint16_t)banner_payload.length(), bl, 0); + auto len_payload = static_cast(banner_payload.length()); + encode(len_payload, bl, 0); bl.claim_append(banner_payload); + logger().debug("{} SEND({}) banner: len_payload={}, supported={}, " + "required={}, banner=\"{}\"", + conn, bl.length(), len_payload, + CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES, + CEPH_BANNER_V2_PREFIX); return write_flush(std::move(bl)).then([this] { // 2. read peer banner unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16); @@ -408,12 +448,15 @@ seastar::future ProtocolV2::banner_exchange() }).then([this] (auto bl) { // 3. process peer banner and read banner_payload unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); + logger().debug("{} RECV({}) banner: \"{}\"", + conn, bl.size(), + std::string((const char*)bl.get(), banner_prefix_len)); if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { - logger().error("{} peer is using V1 protocol", conn); + logger().warn("{} peer is using V1 protocol", conn); } else { - logger().error("{} peer sent bad banner", conn); + logger().warn("{} peer sent bad banner", conn); } abort_in_fault(); } @@ -426,9 +469,10 @@ seastar::future ProtocolV2::banner_exchange() try { decode(payload_len, ti); } catch (const buffer::error &e) { - logger().error("{} decode banner payload len failed", conn); + logger().warn("{} decode banner payload len failed", conn); abort_in_fault(); } + logger().debug("{} GOT banner: payload_len={}", conn, payload_len); return read(payload_len); }).then([this] (bufferlist bl) { // 4. process peer banner_payload and send HelloFrame @@ -439,11 +483,12 @@ seastar::future ProtocolV2::banner_exchange() decode(peer_supported_features, p); decode(peer_required_features, p); } catch (const buffer::error &e) { - logger().error("{} decode banner payload failed", conn); + logger().warn("{} decode banner payload failed", conn); abort_in_fault(); } - logger().trace("{} supported={} required={}", - conn, peer_supported_features, peer_required_features); + logger().debug("{} RECV({}) banner features: supported={} required={}", + conn, bl.length(), + peer_supported_features, peer_required_features); // Check feature bit compatibility uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES; @@ -467,6 +512,9 @@ seastar::future ProtocolV2::banner_exchange() auto hello = HelloFrame::Encode(messenger.get_mytype(), conn.target_addr); + logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", + conn, ceph_entity_type_name(messenger.get_mytype()), + conn.target_addr); return write_frame(hello); }).then([this] { //5. read peer HelloFrame @@ -477,7 +525,7 @@ seastar::future ProtocolV2::banner_exchange() }).then([this] { // 6. process peer HelloFrame auto hello = HelloFrame::Decode(rx_segments_data.back()); - logger().trace("{} received hello: peer_type={} peer_addr_for_me={}", + logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", conn, ceph_entity_type_name(hello.entity_type()), hello.peer_addr()); return seastar::make_ready_future( @@ -496,8 +544,8 @@ seastar::future<> ProtocolV2::handle_auth_reply() return read_frame_payload().then([this] { // handle_auth_bad_method() logic auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back()); - logger().warn("{} got AuthBadMethod, method={} reslt={}, " - "allowed methods={}, allowed modes={}", + logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, " + "allowed_methods={}, allowed_modes={}", conn, bad_method.method(), cpp_strerror(bad_method.result()), bad_method.allowed_methods(), bad_method.allowed_modes()); ceph_assert(messenger.get_auth_client()); @@ -506,8 +554,8 @@ seastar::future<> ProtocolV2::handle_auth_reply() bad_method.method(), bad_method.result(), bad_method.allowed_methods(), bad_method.allowed_modes()); if (r < 0) { - logger().error("{} auth_client handle_auth_bad_method returned {}", - conn, r); + logger().warn("{} auth_client handle_auth_bad_method returned {}", + conn, r); abort_in_fault(); } return client_auth(bad_method.allowed_methods()); @@ -516,13 +564,15 @@ seastar::future<> ProtocolV2::handle_auth_reply() return read_frame_payload().then([this] { // handle_auth_reply_more() logic auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back()); - logger().trace("{} auth reply more len={}", + logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}", conn, auth_more.auth_payload().length()); ceph_assert(messenger.get_auth_client()); // let execute_connecting() take care of the thrown exception auto reply = messenger.get_auth_client()->handle_auth_reply_more( conn.shared_from_this(), auth_meta, auth_more.auth_payload()); auto more_reply = AuthRequestMoreFrame::Encode(reply); + logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", + conn, reply.length()); return write_frame(more_reply); }).then([this] { return handle_auth_reply(); @@ -531,6 +581,10 @@ seastar::future<> ProtocolV2::handle_auth_reply() return read_frame_payload().then([this] { // handle_auth_done() logic auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}", + conn, auth_done.global_id(), + ceph_con_mode_name(auth_done.con_mode()), + auth_done.auth_payload().length()); ceph_assert(messenger.get_auth_client()); int r = messenger.get_auth_client()->handle_auth_done( conn.shared_from_this(), auth_meta, @@ -538,7 +592,7 @@ seastar::future<> ProtocolV2::handle_auth_reply() auth_done.con_mode(), auth_done.auth_payload()); if (r < 0) { - logger().error("{} auth_client handle_auth_done returned {}", conn, r); + logger().warn("{} auth_client handle_auth_done returned {}", conn, r); abort_in_fault(); } auth_meta->con_mode = auth_done.con_mode(); @@ -565,6 +619,9 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta); auth_meta->auth_method = auth_method; auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl); + logger().debug("{} WRITE AuthRequestFrame: method={}," + " preferred_modes={}, payload_len={}", + conn, auth_method, preferred_modes, bl.length()); return write_frame(frame).then([this] { return handle_auth_reply(); }); @@ -580,7 +637,7 @@ seastar::future ProtocolV2::process_wait() { return read_frame_payload().then([this] { // handle_wait() logic - logger().trace("{} received WAIT (connection race)", conn); + logger().warn("{} GOT WaitFrame", conn); WaitFrame::Decode(rx_segments_data.back()); return false; }); @@ -597,7 +654,6 @@ seastar::future ProtocolV2::client_connect() entity_addr_t a; a.u.sa.sa_family = AF_INET; a.set_type(entity_addr_t::TYPE_MSGR2); - logger().debug("{} learn from addr {}", conn, a); return messenger.learned_addr(a).then([this] { uint64_t flags = 0; if (conn.policy.lossy) { @@ -613,9 +669,9 @@ seastar::future ProtocolV2::client_connect() conn.policy.features_required | msgr2_required, flags, client_cookie); - logger().trace("{} sending identification: addrs={} target={} gid={}" - " global_seq={} features_supported={} features_required={}" - " flags={} cookie={}", + logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", conn, messenger.get_myaddrs(), conn.target_addr, messenger.get_myname().num(), global_seq, conn.policy.features_supported, @@ -630,8 +686,9 @@ seastar::future ProtocolV2::client_connect() return read_frame_payload().then([this] { // handle_ident_missing_features() logic auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back()); - logger().error("{} client does not support all server features: {}", - conn, ident_missing.features()); + logger().warn("{} GOT IdentMissingFeaturesFrame: features={}" + " (client does not support all server features)", + conn, ident_missing.features()); abort_in_fault(); // won't be executed return false; @@ -642,10 +699,10 @@ seastar::future ProtocolV2::client_connect() return read_frame_payload().then([this] { // handle_server_ident() logic auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back()); - logger().trace("{} received server identification:" - " addrs={} gid={} global_seq={}" - " features_supported={} features_required={}" - " flags={} cookie={}", + logger().debug("{} GOT ServerIdentFrame:" + " addrs={}, gid={}, gs={}," + " features_supported={}, features_required={}," + " flags={}, cookie={}", conn, server_ident.addrs(), server_ident.gid(), server_ident.global_seq(), @@ -675,8 +732,6 @@ seastar::future ProtocolV2::client_connect() ceph_assert(server_ident.flags() & CEPH_MSG_CONNECT_LOSSY); conn.policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY; // TODO: backoff = utime_t(); - logger().trace("{} connect success {}, lossy={}, features={}", - conn, connect_seq, conn.policy.lossy, conn.features); return dispatcher.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())) @@ -705,8 +760,8 @@ seastar::future ProtocolV2::client_reconnect() global_seq, connect_seq, conn.in_seq); - logger().trace("{} reconnect to session: client_cookie={}" - " server_cookie={} gs={} cs={} ms={}", + logger().debug("{} WRITE ReconnectFrame: client_cookie={}," + " server_cookie={}, gs={}, cs={}, msg_seq={}", conn, client_cookie, server_cookie, global_seq, connect_seq, conn.in_seq); return write_frame(reconnect).then([this] { @@ -717,26 +772,27 @@ seastar::future ProtocolV2::client_reconnect() return read_frame_payload().then([this] { // handle_session_retry_global() logic auto retry = RetryGlobalFrame::Decode(rx_segments_data.back()); + logger().warn("{} GOT RetryGlobalFrame: gs={}", + conn, retry.global_seq()); global_seq = messenger.get_global_seq(retry.global_seq()); - logger().warn("{} received session retry global " - "global_seq={}, choose new gs={}", - conn, retry.global_seq(), global_seq); + logger().warn("{} UPDATE: gs={}", conn, global_seq); return client_reconnect(); }); case Tag::SESSION_RETRY: return read_frame_payload().then([this] { // handle_session_retry() logic auto retry = RetryFrame::Decode(rx_segments_data.back()); + logger().warn("{} GOT RetryFrame: cs={}", + conn, retry.connect_seq()); connect_seq = retry.connect_seq() + 1; - logger().warn("{} received session retry connect_seq={}, inc to cs={}", - conn, retry.connect_seq(), connect_seq); + logger().warn("{} UPDATE: cs={}", conn, connect_seq); return client_reconnect(); }); case Tag::SESSION_RESET: return read_frame_payload().then([this] { // handle_session_reset() logic auto reset = ResetFrame::Decode(rx_segments_data.back()); - logger().warn("{} received session reset full={}", reset.full()); + logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); reset_session(reset.full()); return client_connect(); }); @@ -746,10 +802,8 @@ seastar::future ProtocolV2::client_reconnect() return read_frame_payload().then([this] { // handle_reconnect_ok() logic auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back()); - logger().trace("{} received reconnect ok:" - "sms={}, lossy={}, features={}", - conn, reconnect_ok.msg_seq(), - connect_seq, conn.policy.lossy, conn.features); + logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", + conn, reconnect_ok.msg_seq()); // TODO // discard_requeued_up_to() // backoff = utime_t(); @@ -778,12 +832,14 @@ void ProtocolV2::execute_connecting() seastar::with_gate(pending_dispatch, [this] { enable_recording(); global_seq = messenger.get_global_seq(); + logger().debug("{} UPDATE: gs={}", conn, global_seq); return Socket::connect(conn.peer_addr) .then([this](SocketFRef sock) { + logger().debug("{} socket connected", conn); socket = std::move(sock); if (state == state_t::CLOSING) { return socket->close().then([this] { - logger().info("{} is closed during Socket::connect()", conn); + logger().warn("{} is closed during Socket::connect()", conn); abort_in_fault(); }); } @@ -793,15 +849,14 @@ void ProtocolV2::execute_connecting() }).then([this] (entity_type_t _peer_type, entity_addr_t _my_addr_from_peer) { if (conn.get_peer_type() != _peer_type) { - logger().debug("{} connection peer type does not match what peer advertises {} != {}", - conn, ceph_entity_type_name(conn.get_peer_type()), - ceph_entity_type_name(_peer_type)); + logger().warn("{} connection peer type does not match what peer advertises {} != {}", + conn, ceph_entity_type_name(conn.get_peer_type()), + ceph_entity_type_name(_peer_type)); dispatch_reset(); abort_in_close(); } if (messenger.get_myaddrs().empty() || messenger.get_myaddrs().front().is_blank_ip()) { - logger().trace("peer {} says I am {}", conn.target_addr, _my_addr_from_peer); return messenger.learned_addr(_my_addr_from_peer); } else { return seastar::now(); @@ -820,6 +875,10 @@ void ProtocolV2::execute_connecting() } }).then([this] (bool proceed_or_wait) { if (proceed_or_wait) { + logger().info("{} connected: gs={}, pgs={}, cs={}," + " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, conn.in_seq, conn.out_seq); execute_ready(); } else { execute_wait(); @@ -839,12 +898,12 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r) ceph_assert(r < 0); auto [allowed_methods, allowed_modes] = messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type()); - logger().warn("{} send AuthBadMethod(auth_method={}, r={}, " + auto bad_method = AuthBadMethodFrame::Encode( + auth_meta->auth_method, r, allowed_methods, allowed_modes); + logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, " "allowed_methods={}, allowed_modes={})", conn, auth_meta->auth_method, cpp_strerror(r), allowed_methods, allowed_modes); - auto bad_method = AuthBadMethodFrame::Encode( - auth_meta->auth_method, r, allowed_methods, allowed_modes); return write_frame(bad_method).then([this] { return server_auth(); }); @@ -864,6 +923,9 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo case 1: { auto auth_done = AuthDoneFrame::Encode( conn.peer_global_id, auth_meta->con_mode, reply); + logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", + conn, conn.peer_global_id, + ceph_con_mode_name(auth_meta->con_mode), reply.length()); return write_frame(auth_done).then([this] { ceph_assert(auth_meta); // TODO @@ -875,6 +937,8 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo // auth more case 0: { auto more = AuthReplyMoreFrame::Encode(reply); + logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", + conn, reply.length()); return write_frame(more).then([this] { return read_main_preamble(); }).then([this] (Tag tag) { @@ -882,6 +946,8 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo return read_frame_payload(); }).then([this] { auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}", + conn, auth_more.auth_payload().length()); return _handle_auth_request(auth_more.auth_payload(), true); }); } @@ -905,7 +971,8 @@ seastar::future<> ProtocolV2::server_auth() }).then([this] { // handle_auth_request() logic auto request = AuthRequestFrame::Decode(rx_segments_data.back()); - logger().trace("{} got AuthRequest(method={}, preferred_modes={}, payload_len={})", + logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={}," + " payload_len={}", conn, request.method(), request.preferred_modes(), request.auth_payload().length()); auth_meta->auth_method = request.method(); @@ -923,6 +990,7 @@ seastar::future<> ProtocolV2::server_auth() seastar::future ProtocolV2::send_wait() { auto wait = WaitFrame::Encode(); + logger().warn("{} WRITE WaitFrame", conn); return write_frame(wait).then([this] { return false; }); @@ -937,29 +1005,30 @@ seastar::future ProtocolV2::handle_existing_connection(SocketConnectionRef ceph_assert(exproto); if (exproto->state == state_t::CLOSING) { - logger().warn("{} existing {} already closed.", conn, *existing); + logger().warn("{} existing connection {} already closed.", conn, *existing); return send_server_ident().then([this] { return true; }); } if (exproto->state == state_t::REPLACING) { - logger().warn("{} existing racing replace happened while replacing: {}", + logger().warn("{} racing replace happened while replacing existing connection {}", conn, *existing); return send_wait(); } if (exproto->peer_global_seq > peer_global_seq) { - logger().warn("{} this is a stale connection, peer_global_seq={}" - "existing->peer_global_seq={} close this connection", - conn, peer_global_seq, exproto->peer_global_seq); + logger().warn("{} this is a stale connection, because peer_global_seq({})" + "< existing->peer_global_seq({}), close this connection" + " in favor of existing connection {}", + conn, peer_global_seq, exproto->peer_global_seq, *existing); dispatch_reset(); abort_in_close(); } if (existing->policy.lossy) { // existing connection can be thrown out in favor of this one - logger().warn("{} existing={} is a lossy channel. Close existing in favor of" + logger().warn("{} existing connection {} is a lossy channel. Close existing in favor of" " this connection", conn, *existing); exproto->dispatch_reset(); exproto->close(); @@ -977,9 +1046,9 @@ seastar::future ProtocolV2::server_connect() return read_frame_payload().then([this] { // handle_client_ident() logic auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back()); - logger().trace("{} received client identification: addrs={} target={}" - " gid={} global_seq={} features_supported={}" - " features_required={} flags={} cookie={}", + logger().debug("{} GOT ClientIdentFrame: addrs={}, target={}," + " gid={}, gs={}, features_supported={}," + " features_required={}, flags={}, cookie={}", conn, client_ident.addrs(), client_ident.target_addr(), client_ident.gid(), client_ident.global_seq(), client_ident.supported_features(), @@ -988,12 +1057,12 @@ seastar::future ProtocolV2::server_connect() if (client_ident.addrs().empty() || client_ident.addrs().front() == entity_addr_t()) { - logger().error("{} oops, client_ident.addrs() is empty", conn); + logger().warn("{} oops, client_ident.addrs() is empty", conn); abort_in_fault(); } if (!messenger.get_myaddrs().contains(client_ident.target_addr())) { - logger().error("{} peer is trying to reach {} which is not us ({})", - conn, client_ident.target_addr(), messenger.get_myaddrs()); + logger().warn("{} peer is trying to reach {} which is not us ({})", + conn, client_ident.target_addr(), messenger.get_myaddrs()); abort_in_fault(); } // TODO: change peer_addr to entity_addrvec_t @@ -1002,7 +1071,7 @@ seastar::future ProtocolV2::server_connect() conn.peer_addr.set_type(paddr.get_type()); conn.peer_addr.set_port(paddr.get_port()); conn.peer_addr.set_nonce(paddr.get_nonce()); - logger().trace("{} got paddr={}, conn.peer_addr={}", conn, paddr, conn.peer_addr); + logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr); conn.target_addr = conn.peer_addr; conn.set_peer_id(client_ident.gid()); @@ -1012,14 +1081,16 @@ seastar::future ProtocolV2::server_connect() (conn.policy.features_required | msgr2_required) & ~(uint64_t)client_ident.supported_features(); if (feat_missing) { - logger().warn("{} peer missing required features {}", conn, feat_missing); auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); + logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", + conn, feat_missing); return write_frame(ident_missing_features).then([this] { return false; }); } connection_features = client_ident.supported_features() & conn.policy.features_supported; + logger().debug("{} UPDATE: connection_features={}", conn, connection_features); peer_global_seq = client_ident.global_seq(); @@ -1030,7 +1101,7 @@ seastar::future ProtocolV2::server_connect() if (existing) { if (existing->protocol->proto_type != proto_t::v2) { - logger().warn("{} existing {} proto version is {}, close", + logger().warn("{} existing connection {} proto version is {}, close existing", conn, *existing, static_cast(existing->protocol->proto_type)); // should unregister the existing from msgr atomically @@ -1060,6 +1131,7 @@ seastar::future ProtocolV2::read_reconnect() seastar::future ProtocolV2::send_retry(uint64_t connect_seq) { auto retry = RetryFrame::Encode(connect_seq); + logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); return write_frame(retry).then([this] { return read_reconnect(); }); @@ -1068,6 +1140,7 @@ seastar::future ProtocolV2::send_retry(uint64_t connect_seq) seastar::future ProtocolV2::send_retry_global(uint64_t global_seq) { auto retry = RetryGlobalFrame::Encode(global_seq); + logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); return write_frame(retry).then([this] { return read_reconnect(); }); @@ -1076,6 +1149,7 @@ seastar::future ProtocolV2::send_retry_global(uint64_t global_seq) seastar::future ProtocolV2::send_reset(bool full) { auto reset = ResetFrame::Encode(full); + logger().warn("{} WRITE ResetFrame: full={}", conn, full); return write_frame(reset).then([this] { return read_main_preamble(); }).then([this] (Tag tag) { @@ -1090,8 +1164,8 @@ seastar::future ProtocolV2::server_reconnect() // handle_reconnect() logic auto reconnect = ReconnectFrame::Decode(rx_segments_data.back()); - logger().trace("{} received reconnect: client_cookie={} server_cookie={}" - " gs={} cs={} ms={}", + logger().debug("{} GOT ReconnectFrame: client_cookie={}, server_cookie={}," + " gs={}, cs={}, msg_seq={}", conn, reconnect.client_cookie(), reconnect.server_cookie(), reconnect.global_seq(), reconnect.connect_seq(), reconnect.msg_seq()); @@ -1111,14 +1185,14 @@ seastar::future ProtocolV2::server_reconnect() if (!existing) { // there is no existing connection therefore cannot reconnect to previous // session - logger().error("{} server_reconnect: no existing connection," - " reseting client", conn); + logger().warn("{} server_reconnect: no existing connection," + " reseting client", conn); return send_reset(true); } if (existing->protocol->proto_type != proto_t::v2) { - logger().warn("{} server_reconnect: existing {} proto version is {}," - "close existing and resetting client.", + logger().warn("{} server_reconnect: existing connection {} proto version is {}," + "close existing and reset client.", conn, *existing, static_cast(existing->protocol->proto_type)); existing->close(); @@ -1129,14 +1203,16 @@ seastar::future ProtocolV2::server_reconnect() ceph_assert(exproto); if (exproto->state == state_t::REPLACING) { - logger().warn("{} server_reconnect: existing racing replace happened while " - " replacing, retry_global. existing={}", conn, *existing); + logger().warn("{} server_reconnect: racing replace happened while " + " replacing existing connection {}, retry global.", + conn, *existing); return send_retry_global(exproto->peer_global_seq); } if (exproto->client_cookie != reconnect.client_cookie()) { - logger().warn("{} server_reconnect: existing={} client cookie mismatch," - " I must have reseted: cc={} rcc={}, reseting client.", + logger().warn("{} server_reconnect:" + " client_cookie mismatch with existing connection {}," + " cc={} rcc={}. I must have reseted, reseting client.", conn, *existing, exproto->client_cookie, reconnect.client_cookie()); return send_reset(conn.policy.resetcheck); } else if (exproto->server_cookie == 0) { @@ -1148,22 +1224,27 @@ seastar::future ProtocolV2::server_reconnect() // - b reconnects to a with cookie X, connect_seq=1 // - a has cookie==0 logger().warn("{} server_reconnect: I was a client and didn't received the" - " server_ident. Asking peer to resume session establishment", - conn); + " server_ident with existing connection {}." + " Asking peer to resume session establishment", + conn, *existing); return send_reset(false); } if (exproto->peer_global_seq > reconnect.global_seq()) { - logger().trace("{} server_reconnect: stale global_seq: sgs={} cgs={}," - " ask client to retry global", - conn, exproto->peer_global_seq, reconnect.global_seq()); + logger().warn("{} server_reconnect: stale global_seq: exist_pgs={} peer_gs={}," + " with existing connection {}," + " ask client to retry global", + conn, exproto->peer_global_seq, reconnect.global_seq(), + *existing); return send_retry_global(exproto->peer_global_seq); } if (exproto->connect_seq > reconnect.connect_seq()) { - logger().trace("{} server_reconnect: stale connect_seq scs={} ccs={}," - " ask client to retry", - conn, exproto->connect_seq, reconnect.connect_seq()); + logger().warn("{} server_reconnect: stale connect_seq exist_cs={} peer_cs={}," + " with existing connection {}," + " ask client to retry", + conn, exproto->connect_seq, reconnect.connect_seq(), + *existing); return send_retry(exproto->connect_seq); } @@ -1173,18 +1254,20 @@ seastar::future ProtocolV2::server_reconnect() !existing->policy.server) { // the existing connection wins logger().warn("{} server_reconnect: reconnect race detected," - " this connection loses to existing={}," + " this connection loses to existing connection {}," " ask client to wait", conn, *existing); return send_wait(); } else { // this connection wins logger().warn("{} server_reconnect: reconnect race detected," - " replacing existing={} socket by this connection's socket", + " replacing existing connection {}" + " socket by this connection's socket", conn, *existing); } } - logger().warn("{} server_reconnect: reconnect to exsiting={}", conn, *existing); + logger().warn("{} server_reconnect: reconnect to exsiting connection {}", + conn, *existing); // everything looks good exproto->connect_seq = reconnect.connect_seq(); @@ -1208,10 +1291,11 @@ void ProtocolV2::execute_accepting() conn.set_peer_type(_peer_type); conn.policy = messenger.get_policy(_peer_type); - logger().trace("{} accept of host type {}, lossy={} server={} standby={} resetcheck={}", - conn, ceph_entity_type_name(_peer_type), - conn.policy.lossy, conn.policy.server, - conn.policy.standby, conn.policy.resetcheck); + logger().info("{} UPDATE: peer_type={}," + " policy(lossy={} server={} standby={} resetcheck={})", + conn, ceph_entity_type_name(_peer_type), + conn.policy.lossy, conn.policy.server, + conn.policy.standby, conn.policy.resetcheck); return server_auth(); }).then([this] { return read_main_preamble(); @@ -1237,6 +1321,10 @@ void ProtocolV2::execute_accepting() messenger.unaccept_conn( seastar::static_pointer_cast( conn.shared_from_this())); + logger().info("{} accepted: gs={}, pgs={}, cs={}," + " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, conn.in_seq, conn.out_seq); execute_ready(); } else { execute_server_wait(); @@ -1260,6 +1348,7 @@ seastar::future<> ProtocolV2::finish_auth() ceph_assert(record_io); record_io = false; rxbuf.clear(); + logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); return write_frame(sig_frame).then([this] { return read_main_preamble(); }).then([this] (Tag tag) { @@ -1268,6 +1357,7 @@ seastar::future<> ProtocolV2::finish_auth() }).then([this] { // handle_auth_signature() logic auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature()); const auto actual_tx_sig = auth_meta->session_key.empty() ? sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf); @@ -1277,8 +1367,6 @@ seastar::future<> ProtocolV2::finish_auth() conn, actual_tx_sig, sig_frame.signature()); abort_in_fault(); } - logger().trace("{} pre-auth signature success sig_frame.signature()={}", - conn, sig_frame.signature()); txbuf.clear(); }); } @@ -1314,9 +1402,9 @@ seastar::future<> ProtocolV2::send_server_ident() flags, server_cookie); - logger().trace("{} sending server identification: addrs={} gid={}" - " global_seq={} features_supported={} features_required={}" - " flags={} cookie={}", + logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", conn, messenger.get_myaddrs(), messenger.get_myname().num(), gs, conn.policy.features_supported, conn.policy.features_required | msgr2_required, @@ -1542,12 +1630,15 @@ void ProtocolV2::execute_ready() return read_frame_payload().then([this] { // handle_message_ack() logic auto ack = AckFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT AckFrame: seq={}", ack.seq()); handle_message_ack(ack.seq()); }); case Tag::KEEPALIVE2: return read_frame_payload().then([this] { // handle_keepalive2() logic auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back()); + logger().debug("{} GOT KeepAliveFrame: timestamp={}", + conn, keepalive_frame.timestamp()); notify_keepalive_ack(keepalive_frame.timestamp()); conn.set_last_keepalive(seastar::lowres_system_clock::now()); }); @@ -1557,7 +1648,7 @@ void ProtocolV2::execute_ready() auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back()); conn.set_last_keepalive_ack( seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}); - logger().trace("{} got KEEPALIVE_ACK {}", + logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", conn, conn.last_keepalive_ack); }); default: { diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index d386bc5da119..0c9a8d254efa 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -59,7 +59,7 @@ seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) for (auto& addr : my_addrs.v) { addr.nonce = nonce; } - logger().info("listening on {}", my_addrs.front().in4_addr()); + logger().info("{} listening on {}", *this, my_addrs.front().in4_addr()); return container().invoke_on_all([my_addrs](auto& msgr) { msgr.do_bind(my_addrs); }).handle_exception_type([this] (const std::system_error& e) { @@ -233,7 +233,10 @@ seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_f addr.u = peer_addr_for_me.u; addr.set_type(peer_addr_for_me.get_type()); addr.set_port(get_myaddr().get_port()); - return set_myaddrs(entity_addrvec_t{addr}); + return set_myaddrs(entity_addrvec_t{addr}).then([this, peer_addr_for_me] { + logger().debug("{} learned myaddr={} from {}", + *this, get_myaddr(), peer_addr_for_me); + }); } SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc index 1b4170f127a8..410843ccf972 100644 --- a/src/test/crimson/perf_crimson_msgr.cc +++ b/src/test/crimson/perf_crimson_msgr.cc @@ -289,7 +289,6 @@ static seastar::future<> run( return seastar::now(); } seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { - logger().info("{}: connected", *conn); conn_stats.connected_time = mono_clock::now(); return seastar::now(); }