]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
src/msg: attach header tid + Added more loggings reef-18.2.4 wip-ksirivad-cbuild-reef-18.2.4
authorKamoltat Sirivadhna <ksirivad@redhat.com>
Wed, 1 Oct 2025 19:53:39 +0000 (19:53 +0000)
committerKamoltat Sirivadhna <ksirivad@redhat.com>
Tue, 7 Oct 2025 20:27:57 +0000 (20:27 +0000)
Also dump the encoded payload on the sender
and receiver side

Signed-off-by: Kamoltat Sirivadhna <ksirivad@redhat.com>
src/common/options/global.yaml.in
src/messages/MPGStats.h
src/msg/Message.cc
src/msg/Message.h
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index f0eaedf5af9b556142cb83f0766b2fd454cff545..81a0995576033478419bbcb51bee55412ae40fc9 100644 (file)
@@ -1504,6 +1504,11 @@ options:
   level: advanced
   default: 8192
   with_legacy: true
+- name: ms_msg_pgstats_verbose
+  type: bool
+  level: advanced
+  default: false
+  with_legacy: true
 - name: inject_early_sigterm
   type: bool
   level: dev
index 65cec5244788998bd6885a2d7d1adf68902e96c7..5ac03431aea582d8c550ffb9805c42a77a8503a6 100644 (file)
@@ -42,7 +42,7 @@ private:
 public:
   std::string_view get_type_name() const override { return "pg_stats"; }
   void print(std::ostream& out) const override {
-    out << "pg_stats(" << pg_stat.size() << " pgs seq " << osd_stat.seq << " v " << version << ")";
+    out << "pg_stats( pg_stat size " << pg_stat.size() << " osd_stat seq " << osd_stat.seq << " v " << version << ")";
   }
 
   void encode_payload(uint64_t features) override {
index 348546abdf01102a9bdf09ed79af323ecf64017f..a4f3a0c9663ddcdec71d249bca432235611c6ba7 100644 (file)
 
 #define dout_subsys ceph_subsys_ms
 
-void Message::encode(uint64_t features, int crcflags, bool skip_header_crc)
+void Message::encode(uint64_t features, int crcflags, bool skip_header_crc, CephContext *cct)
 {
   // encode and copy out of *m
   if (empty_payload()) {
@@ -255,47 +255,18 @@ void Message::encode(uint64_t features, int crcflags, bool skip_header_crc)
 
   if (crcflags & MSG_CRC_DATA) {
     calc_data_crc();
-
-#ifdef ENCODE_DUMP
-    bufferlist bl;
-    encode(get_header(), bl);
-
-    // dump the old footer format
-    ceph_msg_footer_old old_footer;
-    old_footer.front_crc = footer.front_crc;
-    old_footer.middle_crc = footer.middle_crc;
-    old_footer.data_crc = footer.data_crc;
-    old_footer.flags = footer.flags;
-    encode(old_footer, bl);
-
-    encode(get_payload(), bl);
-    encode(get_middle(), bl);
-    encode(get_data(), bl);
-
-    // this is almost an exponential backoff, except because we count
-    // bits we tend to sample things we encode later, which should be
-    // more representative.
-    static int i = 0;
-    i++;
-    int bits = 0;
-    for (unsigned t = i; t; bits++)
-      t &= t - 1;
-    if (bits <= 2) {
-      char fn[200];
-      int status;
-      snprintf(fn, sizeof(fn), ENCODE_STRINGIFY(ENCODE_DUMP) "/%s__%d.%x",
-              abi::__cxa_demangle(typeid(*this).name(), 0, 0, &status),
-              getpid(), i++);
-      int fd = ::open(fn, O_WRONLY|O_TRUNC|O_CREAT|O_CLOEXEC|O_BINARY, 0644);
-      if (fd >= 0) {
-       bl.write_fd(fd);
-       ::close(fd);
-      }
-    }
-#endif
   } else {
     footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC;
   }
+  if (header.type == MSG_PGSTATS) {
+    if (cct && cct->_conf->ms_msg_pgstats_verbose) {
+      ldout(cct, 0) << "Encoding MSG_PGSTATS payload tid= " << header.tid
+        << " payload len=" << get_payload().length() << " bytes\n"
+        << "PGSTATS_ENC BEGIN tid=" << header.tid << "\n";
+        get_payload().hexdump(*_dout);
+        *_dout << "PGSTATS_ENC END tid=" << header.tid << dendl;
+    }
+  }
 }
 
 void Message::dump(ceph::Formatter *f) const
@@ -966,6 +937,15 @@ Message *decode_message(CephContext *cct,
   m->set_data(data);
 
   try {
+    if (header.type == MSG_PGSTATS) {
+      if (cct && cct->_conf->ms_msg_pgstats_verbose) {
+        ldout(cct, 0) << "Decoding MSG_PGSTATS payload tid= " << m->get_header().tid
+          << " payload len=" << m->get_payload().length() << " bytes\n"
+          << "PGSTATS_DEC BEGIN tid=" << m->get_header().tid << "\n";
+          m->get_payload().hexdump(*_dout);
+          *_dout << "PGSTATS_DEC END tid=" << m->get_header().tid << dendl;
+        }
+    }
     m->decode_payload();
   }
   catch (const ceph::buffer::error &e) {
index f27c5448ea2daa5fce762bae87374704edab799b..3286ea05afdffd8f6f67f10bb6002674a370c664 100644 (file)
@@ -530,7 +530,7 @@ public:
 
   virtual void dump(ceph::Formatter *f) const;
 
-  void encode(uint64_t features, int crcflags, bool skip_header_crc = false);
+  void encode(uint64_t features, int crcflags, bool skip_header_crc = false, CephContext *cct = nullptr);
 };
 
 extern Message *decode_message(CephContext *cct,
index b45ad8ca5155f37dcadbe2ef7b1dffa4b5122deb..dcc59cee5d141383054834bb7d0c5f97e2a5d164 100644 (file)
@@ -259,7 +259,7 @@ void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
   // encode and copy out of *m
   // in write_message we update header.seq and need recalc crc
   // so skip calc header in encode function.
-  m->encode(features, messenger->crcflags, true);
+  m->encode(features, messenger->crcflags, true, cct);
 
   bl.append(m->get_payload());
   bl.append(m->get_middle());
index 08426b796b88b16c9e0142a7dc7d58d2d8d071f7..c8f29066fc2215648d11077ee64a8d00d70468dc 100644 (file)
@@ -421,8 +421,21 @@ void ProtocolV2::prepare_send_message(uint64_t features,
   ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
                 << features << " " << m  << " " << *m << dendl;
 
+  if (m->get_type() == MSG_PGSTATS && m->get_tid() == 0) {
+    // assign a random tid if not already set, exclusively for MSG_PGSTATS
+    // for debugging purposes since it is easier to track messages with a tid
+    // in the logs
+    m->set_tid(make_tid_uuid());
+  }
+
   // encode and copy out of *m
-  m->encode(features, 0);
+  m->encode(features, 0, false, cct);
+  if (m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) {
+    ldout(cct, 0) << "We are preparing to send MSG_PGSTATS; payload length: " << m->get_payload().length()
+     << "; middle length: " << m->get_middle().length()
+     << "; data length: " << m->get_data().length()
+     << dendl;
+  }
 }
 
 void ProtocolV2::send_message(Message *m) {
@@ -517,11 +530,19 @@ ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
   return out_entry;
 }
 
+uint64_t ProtocolV2::make_tid_uuid() {
+    uuid_d u;
+    u.generate_random();
+    // collapse the first 8 bytes of the UUID into a uint64_t
+    uint64_t val;
+    memcpy(&val, u.bytes(), sizeof(val));
+    return val;
+}
+
 ssize_t ProtocolV2::write_message(Message *m, bool more) {
   FUNCTRACE(cct);
   ceph_assert(connection->center->in_thread());
   m->set_seq(++out_seq);
-
   connection->lock.lock();
   uint64_t ack_seq = in_seq;
   ack_left = 0;
@@ -543,14 +564,31 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
                             m->get_payload(),
                             m->get_middle(),
                             m->get_data());
-  if (!append_frame(message)) {
+
+  bool is_appended;
+  if (m->get_type() == MSG_PGSTATS) {
+    is_appended = append_frame(message, m);
+  } else {
+    is_appended = append_frame(message);
+  }
+  if (!is_appended) {
     m->put();
     return -EILSEQ;
   }
 
   ldout(cct, 5) << __func__ << " sending message m=" << m
                 << " seq=" << m->get_seq() << " " << *m << dendl;
-
+  if (m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) {
+    ldout(cct, 0) << "We are sending MSG_PGSTATS; payload length: " << m->get_payload().length()
+     << "; middle length: " << m->get_middle().length()
+     << "; data length: " << m->get_data().length()
+     << "; header seq: " << m->get_seq()
+     << "; header tid: " << m->get_tid()
+     << "; header version: " << m->get_header().version
+     << "; header compat version: " << m->get_header().compat_version
+     << "; total frame length (after encode): " << connection->outgoing_bl.length()
+     << dendl;
+  }
   m->trace.event("async writing message");
   ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
                  << " src=" << entity_name_t(messenger->get_myname())
@@ -583,15 +621,19 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
 }
 
 template <class F>
-bool ProtocolV2::append_frame(F& frame) {
+bool ProtocolV2::append_frame(F& frame, Message *m) {
   ceph::bufferlist bl;
   try {
     bl = frame.get_buffer(tx_frame_asm);
   } catch (ceph::crypto::onwire::TxHandlerError &e) {
-    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    ldout(cct, 0) << __func__ << " Error: " << e.what() << dendl;
     return false;
   }
-
+  if (m && m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) {
+    ldout(cct, 0) << "We are appending frame for MSG_PGSTATS; frame length: "
+                  << bl.length()
+                  << " bytes" << tx_frame_asm << dendl;
+  }
   ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
                  << " bytes " << tx_frame_asm << dendl;
   connection->outgoing_bl.claim_append(bl);
@@ -1363,7 +1405,13 @@ CtPtr ProtocolV2::handle_message() {
 
   const size_t cur_msg_size = get_current_msg_size();
   auto msg_frame = MessageFrame::Decode(rx_segments_data);
-
+  if (msg_frame.header().type == MSG_PGSTATS) {
+    if (cct->_conf->ms_msg_pgstats_verbose) {
+      ldout(cct, 0) << " We have decoded MSG_PGSTATS message with "
+                    << " message size: " << cur_msg_size
+                    << dendl;
+    }
+  }
   // XXX: paranoid copy just to avoid oops
   ceph_msg_header2 current_header = msg_frame.header();
 
@@ -1376,6 +1424,31 @@ CtPtr ProtocolV2::handle_message() {
                << " src " << peer_name
                << " off " << current_header.data_off
                 << dendl;
+  
+  if (current_header.type == MSG_PGSTATS) {
+    if (cct->_conf->ms_msg_pgstats_verbose) {
+      uint64_t onwire_len = rx_frame_asm.get_frame_onwire_len();
+      ldout(cct, 0) << __func__
+        << " got front_length (payload): " << msg_frame.front_len()
+        << " + middle_length: " << msg_frame.middle_len()
+        << " + data_length: " << msg_frame.data_len()
+        << " byte message."
+        << " envelope type=" << current_header.type
+        << " src= " << peer_name
+        << " off " << current_header.data_off
+        << dendl;
+
+      ldout(cct, 0) << "We are handling MSG_PGSTATS; payload length: " << msg_frame.front_len()
+        << "; middle length: " << msg_frame.middle_len()
+        << "; data length: " << msg_frame.data_len()
+        << "; header seq: " << current_header.seq
+        << "; header tid: " << current_header.tid
+        << "; header version: " << current_header.version
+        << "; header compat version: " << current_header.compat_version
+        << "; total frame length (before decode): " << onwire_len
+        << dendl;
+    }
+}
 
   INTERCEPT(16);
   ceph_msg_header header{current_header.seq,
index 6441866fea4c33a6885df660b75dd6608b9384a2..a17593979e6ab039efdd2508467d55db8e0d1c93 100644 (file)
@@ -138,8 +138,8 @@ private:
                         ceph::bufferlist &buffer);
 
   template <class F>
-  bool append_frame(F& frame);
-
+  bool append_frame(F& frame, Message *m = nullptr);
+  uint64_t make_tid_uuid();
   void requeue_sent();
   uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
   void reset_recv_state();