From 171e4b0c40d454e8cadabbf36fb1ffe841fdee53 Mon Sep 17 00:00:00 2001 From: Kamoltat Sirivadhna Date: Wed, 1 Oct 2025 19:53:39 +0000 Subject: [PATCH] src/msg: attach header tid + Added more loggings Also dump the encoded payload on the sender and receiver side Signed-off-by: Kamoltat Sirivadhna --- src/common/options/global.yaml.in | 5 ++ src/messages/MPGStats.h | 2 +- src/msg/Message.cc | 58 +++++++------------- src/msg/Message.h | 2 +- src/msg/async/ProtocolV1.cc | 2 +- src/msg/async/ProtocolV2.cc | 89 ++++++++++++++++++++++++++++--- src/msg/async/ProtocolV2.h | 4 +- 7 files changed, 110 insertions(+), 52 deletions(-) diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index f0eaedf5af9..81a09955760 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -1504,6 +1504,11 @@ options: level: advanced default: 8192 with_legacy: true +- name: ms_msg_pgstats_verbose + type: bool + level: advanced + default: false + with_legacy: true - name: inject_early_sigterm type: bool level: dev diff --git a/src/messages/MPGStats.h b/src/messages/MPGStats.h index 65cec524478..5ac03431aea 100644 --- a/src/messages/MPGStats.h +++ b/src/messages/MPGStats.h @@ -42,7 +42,7 @@ private: public: std::string_view get_type_name() const override { return "pg_stats"; } void print(std::ostream& out) const override { - out << "pg_stats(" << pg_stat.size() << " pgs seq " << osd_stat.seq << " v " << version << ")"; + out << "pg_stats( pg_stat size " << pg_stat.size() << " osd_stat seq " << osd_stat.seq << " v " << version << ")"; } void encode_payload(uint64_t features) override { diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 348546abdf0..a4f3a0c9663 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -225,7 +225,7 @@ #define dout_subsys ceph_subsys_ms -void Message::encode(uint64_t features, int crcflags, bool skip_header_crc) +void Message::encode(uint64_t features, int crcflags, bool skip_header_crc, CephContext *cct) { // encode and copy out of *m if (empty_payload()) { @@ -255,47 +255,18 @@ void Message::encode(uint64_t features, int crcflags, bool skip_header_crc) if (crcflags & MSG_CRC_DATA) { calc_data_crc(); - -#ifdef ENCODE_DUMP - bufferlist bl; - encode(get_header(), bl); - - // dump the old footer format - ceph_msg_footer_old old_footer; - old_footer.front_crc = footer.front_crc; - old_footer.middle_crc = footer.middle_crc; - old_footer.data_crc = footer.data_crc; - old_footer.flags = footer.flags; - encode(old_footer, bl); - - encode(get_payload(), bl); - encode(get_middle(), bl); - encode(get_data(), bl); - - // this is almost an exponential backoff, except because we count - // bits we tend to sample things we encode later, which should be - // more representative. - static int i = 0; - i++; - int bits = 0; - for (unsigned t = i; t; bits++) - t &= t - 1; - if (bits <= 2) { - char fn[200]; - int status; - snprintf(fn, sizeof(fn), ENCODE_STRINGIFY(ENCODE_DUMP) "/%s__%d.%x", - abi::__cxa_demangle(typeid(*this).name(), 0, 0, &status), - getpid(), i++); - int fd = ::open(fn, O_WRONLY|O_TRUNC|O_CREAT|O_CLOEXEC|O_BINARY, 0644); - if (fd >= 0) { - bl.write_fd(fd); - ::close(fd); - } - } -#endif } else { footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC; } + if (header.type == MSG_PGSTATS) { + if (cct && cct->_conf->ms_msg_pgstats_verbose) { + ldout(cct, 0) << "Encoding MSG_PGSTATS payload tid= " << header.tid + << " payload len=" << get_payload().length() << " bytes\n" + << "PGSTATS_ENC BEGIN tid=" << header.tid << "\n"; + get_payload().hexdump(*_dout); + *_dout << "PGSTATS_ENC END tid=" << header.tid << dendl; + } + } } void Message::dump(ceph::Formatter *f) const @@ -966,6 +937,15 @@ Message *decode_message(CephContext *cct, m->set_data(data); try { + if (header.type == MSG_PGSTATS) { + if (cct && cct->_conf->ms_msg_pgstats_verbose) { + ldout(cct, 0) << "Decoding MSG_PGSTATS payload tid= " << m->get_header().tid + << " payload len=" << m->get_payload().length() << " bytes\n" + << "PGSTATS_DEC BEGIN tid=" << m->get_header().tid << "\n"; + m->get_payload().hexdump(*_dout); + *_dout << "PGSTATS_DEC END tid=" << m->get_header().tid << dendl; + } + } m->decode_payload(); } catch (const ceph::buffer::error &e) { diff --git a/src/msg/Message.h b/src/msg/Message.h index f27c5448ea2..3286ea05afd 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -530,7 +530,7 @@ public: virtual void dump(ceph::Formatter *f) const; - void encode(uint64_t features, int crcflags, bool skip_header_crc = false); + void encode(uint64_t features, int crcflags, bool skip_header_crc = false, CephContext *cct = nullptr); }; extern Message *decode_message(CephContext *cct, diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index b45ad8ca515..dcc59cee5d1 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -259,7 +259,7 @@ void ProtocolV1::prepare_send_message(uint64_t features, Message *m, // encode and copy out of *m // in write_message we update header.seq and need recalc crc // so skip calc header in encode function. - m->encode(features, messenger->crcflags, true); + m->encode(features, messenger->crcflags, true, cct); bl.append(m->get_payload()); bl.append(m->get_middle()); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 08426b796b8..c8f29066fc2 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -421,8 +421,21 @@ void ProtocolV2::prepare_send_message(uint64_t features, ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ") << features << " " << m << " " << *m << dendl; + if (m->get_type() == MSG_PGSTATS && m->get_tid() == 0) { + // assign a random tid if not already set, exclusively for MSG_PGSTATS + // for debugging purposes since it is easier to track messages with a tid + // in the logs + m->set_tid(make_tid_uuid()); + } + // encode and copy out of *m - m->encode(features, 0); + m->encode(features, 0, false, cct); + if (m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) { + ldout(cct, 0) << "We are preparing to send MSG_PGSTATS; payload length: " << m->get_payload().length() + << "; middle length: " << m->get_middle().length() + << "; data length: " << m->get_data().length() + << dendl; + } } void ProtocolV2::send_message(Message *m) { @@ -517,11 +530,19 @@ ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() { return out_entry; } +uint64_t ProtocolV2::make_tid_uuid() { + uuid_d u; + u.generate_random(); + // collapse the first 8 bytes of the UUID into a uint64_t + uint64_t val; + memcpy(&val, u.bytes(), sizeof(val)); + return val; +} + ssize_t ProtocolV2::write_message(Message *m, bool more) { FUNCTRACE(cct); ceph_assert(connection->center->in_thread()); m->set_seq(++out_seq); - connection->lock.lock(); uint64_t ack_seq = in_seq; ack_left = 0; @@ -543,14 +564,31 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { m->get_payload(), m->get_middle(), m->get_data()); - if (!append_frame(message)) { + + bool is_appended; + if (m->get_type() == MSG_PGSTATS) { + is_appended = append_frame(message, m); + } else { + is_appended = append_frame(message); + } + if (!is_appended) { m->put(); return -EILSEQ; } ldout(cct, 5) << __func__ << " sending message m=" << m << " seq=" << m->get_seq() << " " << *m << dendl; - + if (m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) { + ldout(cct, 0) << "We are sending MSG_PGSTATS; payload length: " << m->get_payload().length() + << "; middle length: " << m->get_middle().length() + << "; data length: " << m->get_data().length() + << "; header seq: " << m->get_seq() + << "; header tid: " << m->get_tid() + << "; header version: " << m->get_header().version + << "; header compat version: " << m->get_header().compat_version + << "; total frame length (after encode): " << connection->outgoing_bl.length() + << dendl; + } m->trace.event("async writing message"); ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq() << " src=" << entity_name_t(messenger->get_myname()) @@ -583,15 +621,19 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { } template -bool ProtocolV2::append_frame(F& frame) { +bool ProtocolV2::append_frame(F& frame, Message *m) { ceph::bufferlist bl; try { bl = frame.get_buffer(tx_frame_asm); } catch (ceph::crypto::onwire::TxHandlerError &e) { - ldout(cct, 1) << __func__ << " " << e.what() << dendl; + ldout(cct, 0) << __func__ << " Error: " << e.what() << dendl; return false; } - + if (m && m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) { + ldout(cct, 0) << "We are appending frame for MSG_PGSTATS; frame length: " + << bl.length() + << " bytes" << tx_frame_asm << dendl; + } ldout(cct, 25) << __func__ << " assembled frame " << bl.length() << " bytes " << tx_frame_asm << dendl; connection->outgoing_bl.claim_append(bl); @@ -1363,7 +1405,13 @@ CtPtr ProtocolV2::handle_message() { const size_t cur_msg_size = get_current_msg_size(); auto msg_frame = MessageFrame::Decode(rx_segments_data); - + if (msg_frame.header().type == MSG_PGSTATS) { + if (cct->_conf->ms_msg_pgstats_verbose) { + ldout(cct, 0) << " We have decoded MSG_PGSTATS message with " + << " message size: " << cur_msg_size + << dendl; + } + } // XXX: paranoid copy just to avoid oops ceph_msg_header2 current_header = msg_frame.header(); @@ -1376,6 +1424,31 @@ CtPtr ProtocolV2::handle_message() { << " src " << peer_name << " off " << current_header.data_off << dendl; + + if (current_header.type == MSG_PGSTATS) { + if (cct->_conf->ms_msg_pgstats_verbose) { + uint64_t onwire_len = rx_frame_asm.get_frame_onwire_len(); + ldout(cct, 0) << __func__ + << " got front_length (payload): " << msg_frame.front_len() + << " + middle_length: " << msg_frame.middle_len() + << " + data_length: " << msg_frame.data_len() + << " byte message." + << " envelope type=" << current_header.type + << " src= " << peer_name + << " off " << current_header.data_off + << dendl; + + ldout(cct, 0) << "We are handling MSG_PGSTATS; payload length: " << msg_frame.front_len() + << "; middle length: " << msg_frame.middle_len() + << "; data length: " << msg_frame.data_len() + << "; header seq: " << current_header.seq + << "; header tid: " << current_header.tid + << "; header version: " << current_header.version + << "; header compat version: " << current_header.compat_version + << "; total frame length (before decode): " << onwire_len + << dendl; + } +} INTERCEPT(16); ceph_msg_header header{current_header.seq, diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 6441866fea4..a17593979e6 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -138,8 +138,8 @@ private: ceph::bufferlist &buffer); template - bool append_frame(F& frame); - + bool append_frame(F& frame, Message *m = nullptr); + uint64_t make_tid_uuid(); void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); void reset_recv_state(); -- 2.39.5