]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: msgr2: organizing log messages
authorRicardo Dias <rdias@suse.com>
Tue, 22 Jan 2019 17:25:46 +0000 (17:25 +0000)
committerRicardo Dias <rdias@suse.com>
Wed, 23 Jan 2019 13:59:29 +0000 (13:59 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/AsyncConnection.cc
src/msg/async/ProtocolV2.cc

index 13711c98bc933a3ae4ba2c84c4467a7ba3afa421..e6727d5872bbcea6de0f79e050efca2353f37e79 100644 (file)
@@ -650,7 +650,7 @@ void AsyncConnection::mark_down()
 
 void AsyncConnection::handle_write()
 {
-  ldout(async_msgr->cct, 4) << __func__ << dendl;
+  ldout(async_msgr->cct, 10) << __func__ << dendl;
   protocol->write_event();
 }
 
index 392c61690bb54c042cc6aefaf9d9a01e58240ad9..dc81e23e822b2af96c2c9fd628894ca70b998ecf 100644 (file)
@@ -439,6 +439,7 @@ ProtocolV2::~ProtocolV2() {
 }
 
 void ProtocolV2::connect() {
+  ldout(cct, 1) << __func__ << dendl;
   state = START_CONNECT;
 
   got_bad_method = 0;
@@ -448,7 +449,10 @@ void ProtocolV2::connect() {
   }
 }
 
-void ProtocolV2::accept() { state = START_ACCEPT; }
+void ProtocolV2::accept() {
+  ldout(cct, 1) << __func__ << dendl;
+  state = START_ACCEPT;
+}
 
 bool ProtocolV2::is_connected() { return can_write; }
 
@@ -477,7 +481,7 @@ void ProtocolV2::discard_out_queue() {
 }
 
 void ProtocolV2::reset_session() {
-  ldout(cct, 20) << __func__ << dendl;
+  ldout(cct, 1) << __func__ << dendl;
 
   std::lock_guard<std::mutex> l(connection->write_lock);
   if (connection->delay_state) {
@@ -501,7 +505,7 @@ void ProtocolV2::reset_session() {
 }
 
 void ProtocolV2::stop() {
-  ldout(cct, 2) << __func__ << dendl;
+  ldout(cct, 1) << __func__ << dendl;
   if (state == CLOSED) {
     return;
   }
@@ -530,8 +534,9 @@ void ProtocolV2::requeue_sent() {
   while (!sent.empty()) {
     Message *m = sent.back();
     sent.pop_back();
-    ldout(cct, 10) << __func__ << " " << *m << " for resend "
-                   << " (" << m->get_seq() << ")" << dendl;
+    ldout(cct, 5) << __func__ << " requeueing message m=" << m
+                  << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
+                  << *m << dendl;
     rq.push_front(make_pair(bufferlist(), m));
   }
 }
@@ -547,9 +552,9 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   while (!rq.empty()) {
     pair<bufferlist, Message *> p = rq.front();
     if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
-    ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
-                   << p.second->get_seq() << " <= " << seq << ", discarding"
-                   << dendl;
+    ldout(cct, 5) << __func__ << " discarding message m=" << p.second
+                  << " seq=" << p.second->get_seq() << " ack_seq=" << seq << " "
+                  << *(p.second) << dendl;
     p.second->put();
     rq.pop_front();
     count++;
@@ -612,7 +617,7 @@ CtPtr ProtocolV2::_fault() {
 
   if (connection->policy.lossy && state != START_CONNECT &&
       state != CONNECTING) {
-    ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
+    ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
     stop();
     connection->dispatch_queue->queue_reset(connection);
     return nullptr;
@@ -626,7 +631,7 @@ CtPtr ProtocolV2::_fault() {
 
   if (out_queue.empty() && state >= START_ACCEPT &&
       state <= ACCEPTING_SESSION && !replacing) {
-    ldout(cct, 10) << __func__ << " with nothing to send and in the half "
+    ldout(cct, 2) << __func__ << " with nothing to send and in the half "
                    << " accept state just closed" << dendl;
     connection->write_lock.unlock();
     stop();
@@ -640,8 +645,8 @@ CtPtr ProtocolV2::_fault() {
 
   if (connection->policy.standby && out_queue.empty() && !keepalive &&
       state != WAIT) {
-    ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
-                   << dendl;
+    ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
+                  << dendl;
     state = STANDBY;
     connection->write_lock.unlock();
     return nullptr;
@@ -652,10 +657,10 @@ CtPtr ProtocolV2::_fault() {
   if (state != START_CONNECT && state != CONNECTING && state != WAIT) {
     // policy maybe empty when state is in accept
     if (connection->policy.server) {
-      ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
+      ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
       state = STANDBY;
     } else {
-      ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
+      ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
       connect_seq++;
       global_seq = messenger->get_global_seq();
       state = START_CONNECT;
@@ -677,7 +682,7 @@ CtPtr ProtocolV2::_fault() {
     global_seq = messenger->get_global_seq();
     state = START_CONNECT;
     connection->state = AsyncConnection::STATE_CONNECTING;
-    ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
+    ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
     // woke up again;
     connection->register_time_events.insert(
         connection->center->create_time_event(backoff.to_nsec() / 1000,
@@ -724,14 +729,16 @@ void ProtocolV2::send_message(Message *m) {
     // ensure the correctness of message encoding
     bl.clear();
     m->clear_payload();
-    ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
-                  << " != " << connection->get_features() << dendl;
+    ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
+                   << " != " << connection->get_features() << dendl;
   }
   if (state == CLOSED) {
     ldout(cct, 10) << __func__ << " connection closed."
                    << " Drop message " << m << dendl;
     m->put();
   } else {
+    ldout(cct, 5) << __func__ << " enqueueing message m=" << m
+                  << " type=" << m->get_type() << " " << *m << dendl;
     m->trace.event("async enqueueing message");
     out_queue[m->get_priority()].emplace_back(std::move(bl), m);
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
@@ -839,17 +846,17 @@ ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) {
   encrypt_payload(flat_bl);
   MessageFrame message(this, header2, flat_bl);
 
-  ldout(cct, 5) << __func__ << " sending message type=" << header2.type
-                << " src " << entity_name_t(messenger->get_myname())
-                << " front=" << header2.front_len
-                << " data=" << header2.data_len << " off " << header2.data_off
-                << dendl;
+  ldout(cct, 5) << __func__ << " sending message m=" << m
+                << " seq=" << m->get_seq() << " " << *m << dendl;
 
   bufferlist &msg_bl = message.get_buffer();
   connection->outcoming_bl.claim_append(msg_bl);
 
   m->trace.event("async writing message");
-  ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
+  ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
+                 << " src=" << entity_name_t(messenger->get_myname())
+                 << " front=" << header2.front_len
+                 << " data=" << header2.data_len << " off=" << header2.data_off
                  << dendl;
   ssize_t total_send_size = connection->outcoming_bl.length();
   ssize_t rc = connection->_try_send(more);
@@ -1217,7 +1224,7 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
     return _fault();
   }
 
-unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+  unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
 
   if (memcmp(buffer, CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
     if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
@@ -1250,11 +1257,6 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
   offset += sizeof(__le64);
   peer_required_features = *(__le64 *)(buffer + offset);
 
-  ldout(cct, 1) << __func__ << " banner peer_type=" << (int)peer_type
-                << " supported=" << std::hex << peer_supported_features
-                << " required=" << std::hex << peer_required_features
-                << std::dec << dendl;
-
   if (connection->get_peer_type() == -1) {
     connection->set_peer_type(peer_type);
 
@@ -1277,6 +1279,11 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
     }
   }
 
+  ldout(cct, 1) << __func__ << " peer_type=" << (int)peer_type
+                << " supported=" << std::hex << peer_supported_features
+                << " required=" << std::hex << peer_required_features
+                << std::dec << dendl;
+
   // Check feature bit compatibility
 
   __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
@@ -1498,6 +1505,10 @@ CtPtr ProtocolV2::ready() {
   connection->maybe_start_delay_thread();
 
   state = READY;
+  ldout(cct, 1) << __func__ << " entity=" << peer_name << " cookie=" << std::hex
+                << cookie << std::dec << " in_seq=" << in_seq
+                << " out_seq=" << out_seq << dendl;
+
   return CONTINUE(read_frame);
 }
 
@@ -1914,9 +1925,10 @@ CtPtr ProtocolV2::handle_message_complete() {
 
   // note last received message.
   in_seq = message->get_seq();
-  ldout(cct, 5) << " rx " << message->get_source() << " seq "
-                << message->get_seq() << " " << message << " " << *message
-                << dendl;
+  ldout(cct, 5) << __func__ << " received message m=" << message
+                << " seq=" << message->get_seq()
+                << " from=" << message->get_source() << " type=" << header.type
+                << " " << *message << dendl;
 
   bool need_dispatch_writer = true;
   if (!connection->policy.lossy) {
@@ -2693,6 +2705,8 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
     return WRITE(bl, "session retry", read_frame);
   }
 
+  ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
+
   // everything looks good
   exproto->connect_seq = reconnect.connect_seq();
   exproto->message_seq = reconnect.msg_seq();
@@ -2737,9 +2751,9 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
   if (existing->policy.lossy) {
     // existing connection can be thrown out in favor of this one
     ldout(cct, 1)
-        << __func__
-        << " accept replacing existing (lossy) channel (new one lossy="
-        << connection->policy.lossy << ")" << dendl;
+        << __func__ << " existing=" << existing
+        << " is a lossy channel. Stopping existing in favor of this connection"
+        << dendl;
     existing->protocol->stop();
     existing->dispatch_queue->queue_reset(existing.get());
     return send_server_ident();
@@ -2749,9 +2763,8 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
     // Found previous session
     // peer has reseted and we're going to reuse the existing connection
     // by replacing the communication socket
-    ldout(cct, 1) << __func__
-                  << " found previous session, peer must have reseted."
-                  << dendl;
+    ldout(cct, 1) << __func__ << " found previous session existing=" << existing
+                  << ", peer must have reseted." << dendl;
     if (connection->policy.resetcheck) {
       exproto->reset_session();
     }
@@ -2759,9 +2772,8 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
   }
 
   if (exproto->state == READY || exproto->state == STANDBY) {
-    ldout(cct, 1) << __func__
-                  << " existing connection is READY/STANDBY, lets reuse it"
-                  << dendl;
+    ldout(cct, 1) << __func__ << " existing=" << existing
+                  << " is READY/STANDBY, lets reuse it" << dendl;
     return reuse_connection(existing, exproto, false);
   }
 
@@ -2773,14 +2785,14 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
     // this connection wins
     ldout(cct, 1) << __func__
                   << " connection race detected, replacing existing="
-                  << existing << " socket by this connection's socket"
-                  << dendl;
+                  << existing << " socket by this connection's socket" << dendl;
     return reuse_connection(existing, exproto, false);
   } else {
     // the existing connection wins
-    ldout(cct, 1) << __func__
-                  << " connection race detected, this connection loses"
-                  << dendl;
+    ldout(cct, 1)
+        << __func__
+        << " connection race detected, this connection loses to existing="
+        << existing << dendl;
     ceph_assert(connection->peer_addrs->msgr2_addr() >
                 messenger->get_myaddrs().msgr2_addr());
 
@@ -2822,13 +2834,15 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   auto temp_cs = std::move(connection->cs);
   EventCenter *new_center = connection->center;
   Worker *new_worker = connection->worker;
+
+  ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
+                           << dendl;
   // avoid _stop shutdown replacing socket
   // queue a reset on the new connection, which we're dumping for the old
   stop();
 
   connection->dispatch_queue->queue_reset(connection);
-  ldout(messenger->cct, 1) << __func__ << " stop myself to swap existing"
-                           << dendl;
+
   exproto->can_write = false;
   exproto->replacing = true;
   exproto->session_security = session_security;