]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: clean up loggings in v2 protocol
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 11 Jul 2019 12:38:25 +0000 (20:38 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 16 Jul 2019 11:39:53 +0000 (19:39 +0800)
Defind log levels in V2 Protocol:
* error level, something error that cause connection to terminate:
  - fatal errors;
  - bugs;
* warn level: something unusual that identifies connection fault or replacement:
  - unstable network;
  - incompatible peer;
  - auth failure;
  - connection race;
  - connection reset;
* info level, something very important to show connection lifecycle,
  which doesn't happen very frequently;
* debug level, important logs for debugging, including:
  - all the messages sent/received (-->/<==);
  - all the frames exchanged (WRITE/GOT);
  - important fields updated (UPDATE);
  - connection state transitions (TRIGGER);
* trace level, trivial logs showing:
  - the exact bytes being sent/received (SEND/RECV(bytes));
  - detailed information of sub-frames;
  - integrity checks;
  - etc.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketMessenger.cc
src/test/crimson/perf_crimson_msgr.cc

index 47216d096121804532848c119768b543eb357be8..e7b529ce49a7f6d2838ee59cb2a0292da0443b97 100644 (file)
@@ -27,6 +27,29 @@ using namespace ceph::msgr::v2;
 
 namespace {
 
+// TODO: apply the same logging policy to Protocol V1
+// Log levels in V2 Protocol:
+// * error level, something error that cause connection to terminate:
+//   - fatal errors;
+//   - bugs;
+// * warn level: something unusual that identifies connection fault or replacement:
+//   - unstable network;
+//   - incompatible peer;
+//   - auth failure;
+//   - connection race;
+//   - connection reset;
+// * info level, something very important to show connection lifecycle,
+//   which doesn't happen very frequently;
+// * debug level, important logs for debugging, including:
+//   - all the messages sent/received (-->/<==);
+//   - all the frames exchanged (WRITE/GOT);
+//   - important fields updated (UPDATE);
+//   - connection state transitions (TRIGGER);
+// * trace level, trivial logs showing:
+//   - the exact bytes being sent/received (SEND/RECV(bytes));
+//   - detailed information of sub-frames;
+//   - integrity checks;
+//   - etc.
 seastar::logger& logger() {
   return ceph::get_logger(ceph_subsys_ms);
 }
@@ -44,10 +67,10 @@ inline void expect_tag(const Tag& expected,
                        ceph::net::SocketConnection& conn,
                        const char *where) {
   if (actual != expected) {
-    logger().error("{} {} received wrong tag: {}, expected {}",
-                   conn, where,
-                   static_cast<uint32_t>(actual),
-                   static_cast<uint32_t>(expected));
+    logger().warn("{} {} received wrong tag: {}, expected {}",
+                  conn, where,
+                  static_cast<uint32_t>(actual),
+                  static_cast<uint32_t>(expected));
     abort_in_fault();
   }
 }
@@ -55,8 +78,8 @@ inline void expect_tag(const Tag& expected,
 inline void unexpected_tag(const Tag& unexpected,
                            ceph::net::SocketConnection& conn,
                            const char *where) {
-  logger().error("{} {} received unexpected tag: {}",
-                 conn, where, static_cast<uint32_t>(unexpected));
+  logger().warn("{} {} received unexpected tag: {}",
+                conn, where, static_cast<uint32_t>(unexpected));
   abort_in_fault();
 }
 
@@ -110,6 +133,10 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
   conn.target_addr = _peer_addr;
   conn.set_peer_type(_peer_type);
   conn.policy = messenger.get_policy(_peer_type);
+  logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={},"
+                " policy(lossy={}, server={}, standby={}, resetcheck={})",
+                conn, _peer_addr, ceph_entity_type_name(_peer_type), conn.policy.lossy,
+                conn.policy.server, conn.policy.standby, conn.policy.resetcheck);
   messenger.register_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   execute_connecting();
@@ -122,6 +149,7 @@ void ProtocolV2::start_accept(SocketFRef&& sock,
   ceph_assert(!socket);
   conn.target_addr = _peer_addr;
   socket = std::move(sock);
+  logger().info("{} ProtocolV2::start_accept(): peer_addr={}", conn, _peer_addr);
   messenger.accept_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   execute_accepting();
@@ -206,28 +234,30 @@ seastar::future<Tag> ProtocolV2::read_main_preamble()
       // everything through ::Decode is unnecessary.
       const auto& main_preamble = \
         *reinterpret_cast<const preamble_block_t*>(bl.get());
+      logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
+                     conn, bl.size(), (int)main_preamble.tag,
+                     (int)main_preamble.num_segments, main_preamble.crc);
 
       // verify preamble's CRC before any further processing
       const auto rx_crc = ceph_crc32c(0,
         reinterpret_cast<const unsigned char*>(&main_preamble),
         sizeof(main_preamble) - sizeof(main_preamble.crc));
       if (rx_crc != main_preamble.crc) {
-        logger().error("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
-                       conn, rx_crc, main_preamble.crc);
+        logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
+                      conn, rx_crc, main_preamble.crc);
         abort_in_fault();
       }
-      logger().trace("{} read main preamble: tag={}, len={}", conn, (int)main_preamble.tag, bl.size());
 
       // currently we do support between 1 and MAX_NUM_SEGMENTS segments
       if (main_preamble.num_segments < 1 ||
           main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-        logger().error("{} unsupported num_segments={}",
-                       conn, main_preamble.num_segments);
+        logger().warn("{} unsupported num_segments={}",
+                      conn, main_preamble.num_segments);
         abort_in_fault();
       }
       if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-        logger().error("{} num_segments too much: {}",
-                       conn, main_preamble.num_segments);
+        logger().warn("{} num_segments too much: {}",
+                      conn, main_preamble.num_segments);
         abort_in_fault();
       }
 
@@ -235,7 +265,7 @@ seastar::future<Tag> ProtocolV2::read_main_preamble()
       rx_segments_data.clear();
 
       for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
-        logger().trace("{} got new segment: len={} align={}",
+        logger().trace("{} GOT frame segment: len={} align={}",
                        conn, main_preamble.segments[idx].length,
                        main_preamble.segments[idx].alignment);
         rx_segments_desc.emplace_back(main_preamble.segments[idx]);
@@ -263,10 +293,10 @@ seastar::future<> ProtocolV2::read_frame_payload()
       // TODO: create aligned and contiguous buffer from socket
       return read_exactly(cur_rx_desc.length)
       .then([this] (auto tmp_bl) {
+        logger().trace("{} RECV({}) frame segment[{}]",
+                       conn, tmp_bl.size(), rx_segments_data.size());
         bufferlist data;
         data.append(buffer::create(std::move(tmp_bl)));
-        logger().trace("{} read frame segment[{}], length={}",
-                       conn, rx_segments_data.size(), data.length());
         if (session_stream_handlers.rx) {
           // TODO
           ceph_assert(false);
@@ -279,7 +309,7 @@ seastar::future<> ProtocolV2::read_frame_payload()
     ceph_assert(!session_stream_handlers.rx);
     return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE);
   }).then([this] (auto bl) {
-    logger().trace("{} read frame epilogue length={}", conn, bl.size());
+    logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
 
     __u8 late_flags;
     if (session_stream_handlers.rx) {
@@ -291,9 +321,9 @@ seastar::future<> ProtocolV2::read_frame_payload()
         const __u32 expected_crc = epilogue.crc_values[idx];
         const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
         if (expected_crc != calculated_crc) {
-          logger().error("{} message integrity check failed at index {}:"
-                         " expected_crc={} calculated_crc={}",
-                         conn, (unsigned int)idx, expected_crc, calculated_crc);
+          logger().warn("{} message integrity check failed at index {}:"
+                        " expected_crc={} calculated_crc={}",
+                        conn, (unsigned int)idx, expected_crc, calculated_crc);
           abort_in_fault();
         } else {
           logger().trace("{} message integrity check success at index {}: crc={}",
@@ -302,6 +332,8 @@ seastar::future<> ProtocolV2::read_frame_payload()
       }
       late_flags = epilogue.late_flags;
     }
+    logger().trace("{} GOT frame epilogue: late_flags={}",
+                   conn, (unsigned)late_flags);
 
     // we do have a mechanism that allows transmitter to start sending message
     // and abort after putting entire data field on wire. This will be used by
@@ -317,8 +349,10 @@ template <class F>
 seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
 {
   auto bl = frame.get_buffer(session_stream_handlers);
-  logger().trace("{} write frame: tag={}, len={}", conn,
-                 static_cast<uint32_t>(frame.tag), bl.length());
+  const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
+  logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
+                 conn, bl.length(), (int)main_preamble->tag,
+                 (int)main_preamble->num_segments, main_preamble->crc);
   if (flush) {
     return write_flush(std::move(bl));
   } else {
@@ -333,7 +367,7 @@ void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool
                    conn, get_state_name(state));
     ceph_assert(false);
   }
-  logger().trace("{} trigger {}, was {}",
+  logger().debug("{} TRIGGER {}, was {}",
                  conn, get_state_name(_state), get_state_name(state));
   state = _state;
   set_write_state(_write_state);
@@ -399,8 +433,14 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
 
   bufferlist bl;
   bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
-  encode((uint16_t)banner_payload.length(), bl, 0);
+  auto len_payload = static_cast<uint16_t>(banner_payload.length());
+  encode(len_payload, bl, 0);
   bl.claim_append(banner_payload);
+  logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
+                 "required={}, banner=\"{}\"",
+                 conn, bl.length(), len_payload,
+                 CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+                 CEPH_BANNER_V2_PREFIX);
   return write_flush(std::move(bl)).then([this] {
       // 2. read peer banner
       unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
@@ -408,12 +448,15 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
     }).then([this] (auto bl) {
       // 3. process peer banner and read banner_payload
       unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+      logger().debug("{} RECV({}) banner: \"{}\"",
+                     conn, bl.size(),
+                     std::string((const char*)bl.get(), banner_prefix_len));
 
       if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
         if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
-          logger().error("{} peer is using V1 protocol", conn);
+          logger().warn("{} peer is using V1 protocol", conn);
         } else {
-          logger().error("{} peer sent bad banner", conn);
+          logger().warn("{} peer sent bad banner", conn);
         }
         abort_in_fault();
       }
@@ -426,9 +469,10 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
       try {
         decode(payload_len, ti);
       } catch (const buffer::error &e) {
-        logger().error("{} decode banner payload len failed", conn);
+        logger().warn("{} decode banner payload len failed", conn);
         abort_in_fault();
       }
+      logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
       return read(payload_len);
     }).then([this] (bufferlist bl) {
       // 4. process peer banner_payload and send HelloFrame
@@ -439,11 +483,12 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
         decode(peer_supported_features, p);
         decode(peer_required_features, p);
       } catch (const buffer::error &e) {
-        logger().error("{} decode banner payload failed", conn);
+        logger().warn("{} decode banner payload failed", conn);
         abort_in_fault();
       }
-      logger().trace("{} supported={} required={}",
-                     conn, peer_supported_features, peer_required_features);
+      logger().debug("{} RECV({}) banner features: supported={} required={}",
+                     conn, bl.length(),
+                     peer_supported_features, peer_required_features);
 
       // Check feature bit compatibility
       uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
@@ -467,6 +512,9 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
 
       auto hello = HelloFrame::Encode(messenger.get_mytype(),
                                       conn.target_addr);
+      logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+                     conn, ceph_entity_type_name(messenger.get_mytype()),
+                     conn.target_addr);
       return write_frame(hello);
     }).then([this] {
       //5. read peer HelloFrame
@@ -477,7 +525,7 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
     }).then([this] {
       // 6. process peer HelloFrame
       auto hello = HelloFrame::Decode(rx_segments_data.back());
-      logger().trace("{} received hello: peer_type={} peer_addr_for_me={}",
+      logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
                      conn, ceph_entity_type_name(hello.entity_type()),
                      hello.peer_addr());
       return seastar::make_ready_future<entity_type_t, entity_addr_t>(
@@ -496,8 +544,8 @@ seastar::future<> ProtocolV2::handle_auth_reply()
         return read_frame_payload().then([this] {
           // handle_auth_bad_method() logic
           auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
-          logger().warn("{} got AuthBadMethod, method={} reslt={}, "
-                        "allowed methods={}, allowed modes={}",
+          logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
+                        "allowed_methods={}, allowed_modes={}",
                         conn, bad_method.method(), cpp_strerror(bad_method.result()),
                         bad_method.allowed_methods(), bad_method.allowed_modes());
           ceph_assert(messenger.get_auth_client());
@@ -506,8 +554,8 @@ seastar::future<> ProtocolV2::handle_auth_reply()
               bad_method.method(), bad_method.result(),
               bad_method.allowed_methods(), bad_method.allowed_modes());
           if (r < 0) {
-            logger().error("{} auth_client handle_auth_bad_method returned {}",
-                           conn, r);
+            logger().warn("{} auth_client handle_auth_bad_method returned {}",
+                          conn, r);
             abort_in_fault();
           }
           return client_auth(bad_method.allowed_methods());
@@ -516,13 +564,15 @@ seastar::future<> ProtocolV2::handle_auth_reply()
         return read_frame_payload().then([this] {
           // handle_auth_reply_more() logic
           auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
-          logger().trace("{} auth reply more len={}",
+          logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
                          conn, auth_more.auth_payload().length());
           ceph_assert(messenger.get_auth_client());
           // let execute_connecting() take care of the thrown exception
           auto reply = messenger.get_auth_client()->handle_auth_reply_more(
             conn.shared_from_this(), auth_meta, auth_more.auth_payload());
           auto more_reply = AuthRequestMoreFrame::Encode(reply);
+          logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
+                         conn, reply.length());
           return write_frame(more_reply);
         }).then([this] {
           return handle_auth_reply();
@@ -531,6 +581,10 @@ seastar::future<> ProtocolV2::handle_auth_reply()
         return read_frame_payload().then([this] {
           // handle_auth_done() logic
           auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+          logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+                         conn, auth_done.global_id(),
+                         ceph_con_mode_name(auth_done.con_mode()),
+                         auth_done.auth_payload().length());
           ceph_assert(messenger.get_auth_client());
           int r = messenger.get_auth_client()->handle_auth_done(
               conn.shared_from_this(), auth_meta,
@@ -538,7 +592,7 @@ seastar::future<> ProtocolV2::handle_auth_reply()
               auth_done.con_mode(),
               auth_done.auth_payload());
           if (r < 0) {
-            logger().error("{} auth_client handle_auth_done returned {}", conn, r);
+            logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
             abort_in_fault();
           }
           auth_meta->con_mode = auth_done.con_mode();
@@ -565,6 +619,9 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
       messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
     auth_meta->auth_method = auth_method;
     auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
+    logger().debug("{} WRITE AuthRequestFrame: method={},"
+                   " preferred_modes={}, payload_len={}",
+                   conn, auth_method, preferred_modes, bl.length());
     return write_frame(frame).then([this] {
       return handle_auth_reply();
     });
@@ -580,7 +637,7 @@ seastar::future<bool> ProtocolV2::process_wait()
 {
   return read_frame_payload().then([this] {
     // handle_wait() logic
-    logger().trace("{} received WAIT (connection race)", conn);
+    logger().warn("{} GOT WaitFrame", conn);
     WaitFrame::Decode(rx_segments_data.back());
     return false;
   });
@@ -597,7 +654,6 @@ seastar::future<bool> ProtocolV2::client_connect()
   entity_addr_t a;
   a.u.sa.sa_family = AF_INET;
   a.set_type(entity_addr_t::TYPE_MSGR2);
-  logger().debug("{} learn from addr {}", conn, a);
   return messenger.learned_addr(a).then([this] {
     uint64_t flags = 0;
     if (conn.policy.lossy) {
@@ -613,9 +669,9 @@ seastar::future<bool> ProtocolV2::client_connect()
         conn.policy.features_required | msgr2_required, flags,
         client_cookie);
 
-    logger().trace("{} sending identification: addrs={} target={} gid={}"
-                   " global_seq={} features_supported={} features_required={}"
-                   " flags={} cookie={}",
+    logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
+                   " gs={}, features_supported={}, features_required={},"
+                   " flags={}, cookie={}",
                    conn, messenger.get_myaddrs(), conn.target_addr,
                    messenger.get_myname().num(), global_seq,
                    conn.policy.features_supported,
@@ -630,8 +686,9 @@ seastar::future<bool> ProtocolV2::client_connect()
         return read_frame_payload().then([this] {
           // handle_ident_missing_features() logic
           auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
-          logger().error("{} client does not support all server features: {}",
-                         conn, ident_missing.features());
+          logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
+                        " (client does not support all server features)",
+                        conn, ident_missing.features());
           abort_in_fault();
           // won't be executed
           return false;
@@ -642,10 +699,10 @@ seastar::future<bool> ProtocolV2::client_connect()
         return read_frame_payload().then([this] {
           // handle_server_ident() logic
           auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
-          logger().trace("{} received server identification:"
-                         " addrs={} gid={} global_seq={}"
-                         " features_supported={} features_required={}"
-                         " flags={} cookie={}",
+          logger().debug("{} GOT ServerIdentFrame:"
+                         " addrs={}, gid={}, gs={},"
+                         " features_supported={}, features_required={},"
+                         " flags={}, cookie={}",
                          conn,
                          server_ident.addrs(), server_ident.gid(),
                          server_ident.global_seq(),
@@ -675,8 +732,6 @@ seastar::future<bool> ProtocolV2::client_connect()
           ceph_assert(server_ident.flags() & CEPH_MSG_CONNECT_LOSSY);
           conn.policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
           // TODO: backoff = utime_t();
-          logger().trace("{} connect success {}, lossy={}, features={}",
-                         conn, connect_seq, conn.policy.lossy, conn.features);
 
           return dispatcher.ms_handle_connect(
               seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
@@ -705,8 +760,8 @@ seastar::future<bool> ProtocolV2::client_reconnect()
                                           global_seq,
                                           connect_seq,
                                           conn.in_seq);
-  logger().trace("{} reconnect to session: client_cookie={}"
-                 " server_cookie={} gs={} cs={} ms={}",
+  logger().debug("{} WRITE ReconnectFrame: client_cookie={},"
+                 " server_cookie={}, gs={}, cs={}, msg_seq={}",
                  conn, client_cookie, server_cookie,
                  global_seq, connect_seq, conn.in_seq);
   return write_frame(reconnect).then([this] {
@@ -717,26 +772,27 @@ seastar::future<bool> ProtocolV2::client_reconnect()
         return read_frame_payload().then([this] {
           // handle_session_retry_global() logic
           auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+          logger().warn("{} GOT RetryGlobalFrame: gs={}",
+                        conn, retry.global_seq());
           global_seq = messenger.get_global_seq(retry.global_seq());
-          logger().warn("{} received session retry global "
-                        "global_seq={}, choose new gs={}",
-                        conn, retry.global_seq(), global_seq);
+          logger().warn("{} UPDATE: gs={}", conn, global_seq);
           return client_reconnect();
         });
       case Tag::SESSION_RETRY:
         return read_frame_payload().then([this] {
           // handle_session_retry() logic
           auto retry = RetryFrame::Decode(rx_segments_data.back());
+          logger().warn("{} GOT RetryFrame: cs={}",
+                        conn, retry.connect_seq());
           connect_seq = retry.connect_seq() + 1;
-          logger().warn("{} received session retry connect_seq={}, inc to cs={}",
-                        conn, retry.connect_seq(), connect_seq);
+          logger().warn("{} UPDATE: cs={}", conn, connect_seq);
           return client_reconnect();
         });
       case Tag::SESSION_RESET:
         return read_frame_payload().then([this] {
           // handle_session_reset() logic
           auto reset = ResetFrame::Decode(rx_segments_data.back());
-          logger().warn("{} received session reset full={}", reset.full());
+          logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
           reset_session(reset.full());
           return client_connect();
         });
@@ -746,10 +802,8 @@ seastar::future<bool> ProtocolV2::client_reconnect()
         return read_frame_payload().then([this] {
           // handle_reconnect_ok() logic
           auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
-          logger().trace("{} received reconnect ok:"
-                         "sms={}, lossy={}, features={}",
-                         conn, reconnect_ok.msg_seq(),
-                         connect_seq, conn.policy.lossy, conn.features);
+          logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
+                         conn, reconnect_ok.msg_seq());
           // TODO
           // discard_requeued_up_to()
           // backoff = utime_t();
@@ -778,12 +832,14 @@ void ProtocolV2::execute_connecting()
   seastar::with_gate(pending_dispatch, [this] {
       enable_recording();
       global_seq = messenger.get_global_seq();
+      logger().debug("{} UPDATE: gs={}", conn, global_seq);
       return Socket::connect(conn.peer_addr)
         .then([this](SocketFRef sock) {
+          logger().debug("{} socket connected", conn);
           socket = std::move(sock);
           if (state == state_t::CLOSING) {
             return socket->close().then([this] {
-              logger().info("{} is closed during Socket::connect()", conn);
+              logger().warn("{} is closed during Socket::connect()", conn);
               abort_in_fault();
             });
           }
@@ -793,15 +849,14 @@ void ProtocolV2::execute_connecting()
         }).then([this] (entity_type_t _peer_type,
                         entity_addr_t _my_addr_from_peer) {
           if (conn.get_peer_type() != _peer_type) {
-            logger().debug("{} connection peer type does not match what peer advertises {} != {}",
-                           conn, ceph_entity_type_name(conn.get_peer_type()),
-                           ceph_entity_type_name(_peer_type));
+            logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+                          conn, ceph_entity_type_name(conn.get_peer_type()),
+                          ceph_entity_type_name(_peer_type));
             dispatch_reset();
             abort_in_close();
           }
           if (messenger.get_myaddrs().empty() ||
               messenger.get_myaddrs().front().is_blank_ip()) {
-            logger().trace("peer {} says I am {}", conn.target_addr, _my_addr_from_peer);
             return messenger.learned_addr(_my_addr_from_peer);
           } else {
             return seastar::now();
@@ -820,6 +875,10 @@ void ProtocolV2::execute_connecting()
           }
         }).then([this] (bool proceed_or_wait) {
           if (proceed_or_wait) {
+            logger().info("{} connected: gs={}, pgs={}, cs={},"
+                          " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+                          conn, global_seq, peer_global_seq, connect_seq,
+                          client_cookie, server_cookie, conn.in_seq, conn.out_seq);
             execute_ready();
           } else {
             execute_wait();
@@ -839,12 +898,12 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r)
   ceph_assert(r < 0);
   auto [allowed_methods, allowed_modes] =
       messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
-  logger().warn("{} send AuthBadMethod(auth_method={}, r={}, "
+  auto bad_method = AuthBadMethodFrame::Encode(
+      auth_meta->auth_method, r, allowed_methods, allowed_modes);
+  logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
                 "allowed_methods={}, allowed_modes={})",
                 conn, auth_meta->auth_method, cpp_strerror(r),
                 allowed_methods, allowed_modes);
-  auto bad_method = AuthBadMethodFrame::Encode(
-      auth_meta->auth_method, r, allowed_methods, allowed_modes);
   return write_frame(bad_method).then([this] {
     return server_auth();
   });
@@ -864,6 +923,9 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
    case 1: {
     auto auth_done = AuthDoneFrame::Encode(
         conn.peer_global_id, auth_meta->con_mode, reply);
+    logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+                   conn, conn.peer_global_id,
+                   ceph_con_mode_name(auth_meta->con_mode), reply.length());
     return write_frame(auth_done).then([this] {
       ceph_assert(auth_meta);
       // TODO
@@ -875,6 +937,8 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
    // auth more
    case 0: {
     auto more = AuthReplyMoreFrame::Encode(reply);
+    logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
+                   conn, reply.length());
     return write_frame(more).then([this] {
       return read_main_preamble();
     }).then([this] (Tag tag) {
@@ -882,6 +946,8 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
       return read_frame_payload();
     }).then([this] {
       auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+      logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
+                     conn, auth_more.auth_payload().length());
       return _handle_auth_request(auth_more.auth_payload(), true);
     });
    }
@@ -905,7 +971,8 @@ seastar::future<> ProtocolV2::server_auth()
   }).then([this] {
     // handle_auth_request() logic
     auto request = AuthRequestFrame::Decode(rx_segments_data.back());
-    logger().trace("{} got AuthRequest(method={}, preferred_modes={}, payload_len={})",
+    logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
+                   " payload_len={}",
                    conn, request.method(), request.preferred_modes(),
                    request.auth_payload().length());
     auth_meta->auth_method = request.method();
@@ -923,6 +990,7 @@ seastar::future<> ProtocolV2::server_auth()
 seastar::future<bool> ProtocolV2::send_wait()
 {
   auto wait = WaitFrame::Encode();
+  logger().warn("{} WRITE WaitFrame", conn);
   return write_frame(wait).then([this] {
     return false;
   });
@@ -937,29 +1005,30 @@ seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef
   ceph_assert(exproto);
 
   if (exproto->state == state_t::CLOSING) {
-    logger().warn("{} existing {} already closed.", conn, *existing);
+    logger().warn("{} existing connection {} already closed.", conn, *existing);
     return send_server_ident().then([this] {
       return true;
     });
   }
 
   if (exproto->state == state_t::REPLACING) {
-    logger().warn("{} existing racing replace happened while replacing: {}",
+    logger().warn("{} racing replace happened while replacing existing connection {}",
                   conn, *existing);
     return send_wait();
   }
 
   if (exproto->peer_global_seq > peer_global_seq) {
-    logger().warn("{} this is a stale connection, peer_global_seq={}"
-                  "existing->peer_global_seq={} close this connection",
-                  conn, peer_global_seq, exproto->peer_global_seq);
+    logger().warn("{} this is a stale connection, because peer_global_seq({})"
+                  "< existing->peer_global_seq({}), close this connection"
+                  " in favor of existing connection {}",
+                  conn, peer_global_seq, exproto->peer_global_seq, *existing);
     dispatch_reset();
     abort_in_close();
   }
 
   if (existing->policy.lossy) {
     // existing connection can be thrown out in favor of this one
-    logger().warn("{} existing={} is a lossy channel. Close existing in favor of"
+    logger().warn("{} existing connection {} is a lossy channel. Close existing in favor of"
                   " this connection", conn, *existing);
     exproto->dispatch_reset();
     exproto->close();
@@ -977,9 +1046,9 @@ seastar::future<bool> ProtocolV2::server_connect()
   return read_frame_payload().then([this] {
     // handle_client_ident() logic
     auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
-    logger().trace("{} received client identification: addrs={} target={}"
-                   " gid={} global_seq={} features_supported={}"
-                   " features_required={} flags={} cookie={}",
+    logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
+                   " gid={}, gs={}, features_supported={},"
+                   " features_required={}, flags={}, cookie={}",
                    conn, client_ident.addrs(), client_ident.target_addr(),
                    client_ident.gid(), client_ident.global_seq(),
                    client_ident.supported_features(),
@@ -988,12 +1057,12 @@ seastar::future<bool> ProtocolV2::server_connect()
 
     if (client_ident.addrs().empty() ||
         client_ident.addrs().front() == entity_addr_t()) {
-      logger().error("{} oops, client_ident.addrs() is empty", conn);
+      logger().warn("{} oops, client_ident.addrs() is empty", conn);
       abort_in_fault();
     }
     if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
-      logger().error("{} peer is trying to reach {} which is not us ({})",
-                     conn, client_ident.target_addr(), messenger.get_myaddrs());
+      logger().warn("{} peer is trying to reach {} which is not us ({})",
+                    conn, client_ident.target_addr(), messenger.get_myaddrs());
       abort_in_fault();
     }
     // TODO: change peer_addr to entity_addrvec_t
@@ -1002,7 +1071,7 @@ seastar::future<bool> ProtocolV2::server_connect()
     conn.peer_addr.set_type(paddr.get_type());
     conn.peer_addr.set_port(paddr.get_port());
     conn.peer_addr.set_nonce(paddr.get_nonce());
-    logger().trace("{} got paddr={}, conn.peer_addr={}", conn, paddr, conn.peer_addr);
+    logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
     conn.target_addr = conn.peer_addr;
 
     conn.set_peer_id(client_ident.gid());
@@ -1012,14 +1081,16 @@ seastar::future<bool> ProtocolV2::server_connect()
       (conn.policy.features_required | msgr2_required) &
       ~(uint64_t)client_ident.supported_features();
     if (feat_missing) {
-      logger().warn("{} peer missing required features {}", conn, feat_missing);
       auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+      logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
+                    conn, feat_missing);
       return write_frame(ident_missing_features).then([this] {
         return false;
       });
     }
     connection_features =
         client_ident.supported_features() & conn.policy.features_supported;
+    logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
 
     peer_global_seq = client_ident.global_seq();
 
@@ -1030,7 +1101,7 @@ seastar::future<bool> ProtocolV2::server_connect()
 
     if (existing) {
       if (existing->protocol->proto_type != proto_t::v2) {
-        logger().warn("{} existing {} proto version is {}, close",
+        logger().warn("{} existing connection {} proto version is {}, close existing",
                       conn, *existing,
                       static_cast<int>(existing->protocol->proto_type));
         // should unregister the existing from msgr atomically
@@ -1060,6 +1131,7 @@ seastar::future<bool> ProtocolV2::read_reconnect()
 seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
 {
   auto retry = RetryFrame::Encode(connect_seq);
+  logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
   return write_frame(retry).then([this] {
     return read_reconnect();
   });
@@ -1068,6 +1140,7 @@ seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
 seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
 {
   auto retry = RetryGlobalFrame::Encode(global_seq);
+  logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
   return write_frame(retry).then([this] {
     return read_reconnect();
   });
@@ -1076,6 +1149,7 @@ seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
 seastar::future<bool> ProtocolV2::send_reset(bool full)
 {
   auto reset = ResetFrame::Encode(full);
+  logger().warn("{} WRITE ResetFrame: full={}", conn, full);
   return write_frame(reset).then([this] {
     return read_main_preamble();
   }).then([this] (Tag tag) {
@@ -1090,8 +1164,8 @@ seastar::future<bool> ProtocolV2::server_reconnect()
     // handle_reconnect() logic
     auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
 
-    logger().trace("{} received reconnect: client_cookie={} server_cookie={}"
-                   " gs={} cs={} ms={}",
+    logger().debug("{} GOT ReconnectFrame: client_cookie={}, server_cookie={},"
+                   " gs={}, cs={}, msg_seq={}",
                    conn, reconnect.client_cookie(), reconnect.server_cookie(),
                    reconnect.global_seq(), reconnect.connect_seq(),
                    reconnect.msg_seq());
@@ -1111,14 +1185,14 @@ seastar::future<bool> ProtocolV2::server_reconnect()
     if (!existing) {
       // there is no existing connection therefore cannot reconnect to previous
       // session
-      logger().error("{} server_reconnect: no existing connection,"
-                     " reseting client", conn);
+      logger().warn("{} server_reconnect: no existing connection,"
+                    " reseting client", conn);
       return send_reset(true);
     }
 
     if (existing->protocol->proto_type != proto_t::v2) {
-      logger().warn("{} server_reconnect: existing {} proto version is {},"
-                    "close existing and resetting client.",
+      logger().warn("{} server_reconnect: existing connection {} proto version is {},"
+                    "close existing and reset client.",
                     conn, *existing,
                     static_cast<int>(existing->protocol->proto_type));
       existing->close();
@@ -1129,14 +1203,16 @@ seastar::future<bool> ProtocolV2::server_reconnect()
     ceph_assert(exproto);
 
     if (exproto->state == state_t::REPLACING) {
-      logger().warn("{} server_reconnect: existing racing replace happened while "
-                    " replacing, retry_global. existing={}", conn, *existing);
+      logger().warn("{} server_reconnect: racing replace happened while "
+                    " replacing existing connection {}, retry global.",
+                    conn, *existing);
       return send_retry_global(exproto->peer_global_seq);
     }
 
     if (exproto->client_cookie != reconnect.client_cookie()) {
-      logger().warn("{} server_reconnect: existing={} client cookie mismatch,"
-                    " I must have reseted: cc={} rcc={}, reseting client.",
+      logger().warn("{} server_reconnect:"
+                    " client_cookie mismatch with existing connection {},"
+                    " cc={} rcc={}. I must have reseted, reseting client.",
                     conn, *existing, exproto->client_cookie, reconnect.client_cookie());
       return send_reset(conn.policy.resetcheck);
     } else if (exproto->server_cookie == 0) {
@@ -1148,22 +1224,27 @@ seastar::future<bool> ProtocolV2::server_reconnect()
       //   - b reconnects to a with cookie X, connect_seq=1
       //   - a has cookie==0
       logger().warn("{} server_reconnect: I was a client and didn't received the"
-                    " server_ident. Asking peer to resume session establishment",
-                    conn);
+                    " server_ident with existing connection {}."
+                    " Asking peer to resume session establishment",
+                    conn, *existing);
       return send_reset(false);
     }
 
     if (exproto->peer_global_seq > reconnect.global_seq()) {
-      logger().trace("{} server_reconnect: stale global_seq: sgs={} cgs={},"
-                     " ask client to retry global",
-                     conn, exproto->peer_global_seq, reconnect.global_seq());
+      logger().warn("{} server_reconnect: stale global_seq: exist_pgs={} peer_gs={},"
+                    " with existing connection {},"
+                    " ask client to retry global",
+                    conn, exproto->peer_global_seq, reconnect.global_seq(),
+                    *existing);
       return send_retry_global(exproto->peer_global_seq);
     }
 
     if (exproto->connect_seq > reconnect.connect_seq()) {
-      logger().trace("{} server_reconnect: stale connect_seq scs={} ccs={},"
-                     " ask client to retry",
-                     conn, exproto->connect_seq, reconnect.connect_seq());
+      logger().warn("{} server_reconnect: stale connect_seq exist_cs={} peer_cs={},"
+                    " with existing connection {},"
+                    " ask client to retry",
+                    conn, exproto->connect_seq, reconnect.connect_seq(),
+                    *existing);
       return send_retry(exproto->connect_seq);
     }
 
@@ -1173,18 +1254,20 @@ seastar::future<bool> ProtocolV2::server_reconnect()
           !existing->policy.server) {
         // the existing connection wins
         logger().warn("{} server_reconnect: reconnect race detected,"
-                      " this connection loses to existing={},"
+                      " this connection loses to existing connection {},"
                       " ask client to wait", conn, *existing);
         return send_wait();
       } else {
         // this connection wins
         logger().warn("{} server_reconnect: reconnect race detected,"
-                      " replacing existing={} socket by this connection's socket",
+                      " replacing existing connection {}"
+                      " socket by this connection's socket",
                       conn, *existing);
       }
     }
 
-    logger().warn("{} server_reconnect: reconnect to exsiting={}", conn, *existing);
+    logger().warn("{} server_reconnect: reconnect to exsiting connection {}",
+                  conn, *existing);
 
     // everything looks good
     exproto->connect_seq = reconnect.connect_seq();
@@ -1208,10 +1291,11 @@ void ProtocolV2::execute_accepting()
           conn.set_peer_type(_peer_type);
 
           conn.policy = messenger.get_policy(_peer_type);
-          logger().trace("{} accept of host type {}, lossy={} server={} standby={} resetcheck={}",
-                         conn, ceph_entity_type_name(_peer_type),
-                         conn.policy.lossy, conn.policy.server,
-                         conn.policy.standby, conn.policy.resetcheck);
+          logger().info("{} UPDATE: peer_type={},"
+                        " policy(lossy={} server={} standby={} resetcheck={})",
+                        conn, ceph_entity_type_name(_peer_type),
+                        conn.policy.lossy, conn.policy.server,
+                        conn.policy.standby, conn.policy.resetcheck);
           return server_auth();
         }).then([this] {
           return read_main_preamble();
@@ -1237,6 +1321,10 @@ void ProtocolV2::execute_accepting()
             messenger.unaccept_conn(
               seastar::static_pointer_cast<SocketConnection>(
                 conn.shared_from_this()));
+            logger().info("{} accepted: gs={}, pgs={}, cs={},"
+                          " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+                          conn, global_seq, peer_global_seq, connect_seq,
+                          client_cookie, server_cookie, conn.in_seq, conn.out_seq);
             execute_ready();
           } else {
             execute_server_wait();
@@ -1260,6 +1348,7 @@ seastar::future<> ProtocolV2::finish_auth()
   ceph_assert(record_io);
   record_io = false;
   rxbuf.clear();
+  logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
   return write_frame(sig_frame).then([this] {
     return read_main_preamble();
   }).then([this] (Tag tag) {
@@ -1268,6 +1357,7 @@ seastar::future<> ProtocolV2::finish_auth()
   }).then([this] {
     // handle_auth_signature() logic
     auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+    logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
 
     const auto actual_tx_sig = auth_meta->session_key.empty() ?
       sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
@@ -1277,8 +1367,6 @@ seastar::future<> ProtocolV2::finish_auth()
                     conn, actual_tx_sig, sig_frame.signature());
       abort_in_fault();
     }
-    logger().trace("{} pre-auth signature success sig_frame.signature()={}",
-                   conn, sig_frame.signature());
     txbuf.clear();
   });
 }
@@ -1314,9 +1402,9 @@ seastar::future<> ProtocolV2::send_server_ident()
           flags,
           server_cookie);
 
-  logger().trace("{} sending server identification: addrs={} gid={}"
-                 " global_seq={} features_supported={} features_required={}"
-                 " flags={} cookie={}",
+  logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+                 " gs={}, features_supported={}, features_required={},"
+                 " flags={}, cookie={}",
                  conn, messenger.get_myaddrs(), messenger.get_myname().num(),
                  gs, conn.policy.features_supported,
                  conn.policy.features_required | msgr2_required,
@@ -1542,12 +1630,15 @@ void ProtocolV2::execute_ready()
             return read_frame_payload().then([this] {
               // handle_message_ack() logic
               auto ack = AckFrame::Decode(rx_segments_data.back());
+              logger().debug("{} GOT AckFrame: seq={}", ack.seq());
               handle_message_ack(ack.seq());
             });
           case Tag::KEEPALIVE2:
             return read_frame_payload().then([this] {
               // handle_keepalive2() logic
               auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+              logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+                             conn, keepalive_frame.timestamp());
               notify_keepalive_ack(keepalive_frame.timestamp());
               conn.set_last_keepalive(seastar::lowres_system_clock::now());
             });
@@ -1557,7 +1648,7 @@ void ProtocolV2::execute_ready()
               auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
               conn.set_last_keepalive_ack(
                 seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
-              logger().trace("{} got KEEPALIVE_ACK {}",
+              logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
                              conn, conn.last_keepalive_ack);
             });
           default: {
index d386bc5da119564954bfb0293b1c8b272fb5a052..0c9a8d254efa1c09109ff6c4115d93f756558a0e 100644 (file)
@@ -59,7 +59,7 @@ seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
   for (auto& addr : my_addrs.v) {
     addr.nonce = nonce;
   }
-  logger().info("listening on {}", my_addrs.front().in4_addr());
+  logger().info("{} listening on {}", *this, my_addrs.front().in4_addr());
   return container().invoke_on_all([my_addrs](auto& msgr) {
     msgr.do_bind(my_addrs);
   }).handle_exception_type([this] (const std::system_error& e) {
@@ -233,7 +233,10 @@ seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_f
   addr.u = peer_addr_for_me.u;
   addr.set_type(peer_addr_for_me.get_type());
   addr.set_port(get_myaddr().get_port());
-  return set_myaddrs(entity_addrvec_t{addr});
+  return set_myaddrs(entity_addrvec_t{addr}).then([this, peer_addr_for_me] {
+    logger().debug("{} learned myaddr={} from {}",
+                   *this, get_myaddr(), peer_addr_for_me);
+  });
 }
 
 SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
index 1b4170f127a855c206ac4a335a7b0247a481e496..410843ccf9727f138ff5bdc50e0fe2aa182df9e8 100644 (file)
@@ -289,7 +289,6 @@ static seastar::future<> run(
         return seastar::now();
       }
       seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
-        logger().info("{}: connected", *conn);
         conn_stats.connected_time = mono_clock::now();
         return seastar::now();
       }