#define dout_subsys ceph_subsys_ms
-void Message::encode(uint64_t features, int crcflags, bool skip_header_crc)
+void Message::encode(uint64_t features, int crcflags, bool skip_header_crc, CephContext *cct)
{
// encode and copy out of *m
if (empty_payload()) {
if (crcflags & MSG_CRC_DATA) {
calc_data_crc();
-
-#ifdef ENCODE_DUMP
- bufferlist bl;
- encode(get_header(), bl);
-
- // dump the old footer format
- ceph_msg_footer_old old_footer;
- old_footer.front_crc = footer.front_crc;
- old_footer.middle_crc = footer.middle_crc;
- old_footer.data_crc = footer.data_crc;
- old_footer.flags = footer.flags;
- encode(old_footer, bl);
-
- encode(get_payload(), bl);
- encode(get_middle(), bl);
- encode(get_data(), bl);
-
- // this is almost an exponential backoff, except because we count
- // bits we tend to sample things we encode later, which should be
- // more representative.
- static int i = 0;
- i++;
- int bits = 0;
- for (unsigned t = i; t; bits++)
- t &= t - 1;
- if (bits <= 2) {
- char fn[200];
- int status;
- snprintf(fn, sizeof(fn), ENCODE_STRINGIFY(ENCODE_DUMP) "/%s__%d.%x",
- abi::__cxa_demangle(typeid(*this).name(), 0, 0, &status),
- getpid(), i++);
- int fd = ::open(fn, O_WRONLY|O_TRUNC|O_CREAT|O_CLOEXEC|O_BINARY, 0644);
- if (fd >= 0) {
- bl.write_fd(fd);
- ::close(fd);
- }
- }
-#endif
} else {
footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC;
}
+ if (header.type == MSG_PGSTATS) {
+ if (cct && cct->_conf->ms_msg_pgstats_verbose) {
+ ldout(cct, 0) << "Encoding MSG_PGSTATS payload tid= " << header.tid
+ << " payload len=" << get_payload().length() << " bytes\n"
+ << "PGSTATS_ENC BEGIN tid=" << header.tid << "\n";
+ get_payload().hexdump(*_dout);
+ *_dout << "PGSTATS_ENC END tid=" << header.tid << dendl;
+ }
+ }
}
void Message::dump(ceph::Formatter *f) const
m->set_data(data);
try {
+ if (header.type == MSG_PGSTATS) {
+ if (cct && cct->_conf->ms_msg_pgstats_verbose) {
+ ldout(cct, 0) << "Decoding MSG_PGSTATS payload tid= " << m->get_header().tid
+ << " payload len=" << m->get_payload().length() << " bytes\n"
+ << "PGSTATS_DEC BEGIN tid=" << m->get_header().tid << "\n";
+ m->get_payload().hexdump(*_dout);
+ *_dout << "PGSTATS_DEC END tid=" << m->get_header().tid << dendl;
+ }
+ }
m->decode_payload();
}
catch (const ceph::buffer::error &e) {
ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
<< features << " " << m << " " << *m << dendl;
+ if (m->get_type() == MSG_PGSTATS && m->get_tid() == 0) {
+ // assign a random tid if not already set, exclusively for MSG_PGSTATS
+ // for debugging purposes since it is easier to track messages with a tid
+ // in the logs
+ m->set_tid(make_tid_uuid());
+ }
+
// encode and copy out of *m
- m->encode(features, 0);
+ m->encode(features, 0, false, cct);
+ if (m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) {
+ ldout(cct, 0) << "We are preparing to send MSG_PGSTATS; payload length: " << m->get_payload().length()
+ << "; middle length: " << m->get_middle().length()
+ << "; data length: " << m->get_data().length()
+ << dendl;
+ }
}
void ProtocolV2::send_message(Message *m) {
return out_entry;
}
+uint64_t ProtocolV2::make_tid_uuid() {
+ uuid_d u;
+ u.generate_random();
+ // collapse the first 8 bytes of the UUID into a uint64_t
+ uint64_t val;
+ memcpy(&val, u.bytes(), sizeof(val));
+ return val;
+}
+
ssize_t ProtocolV2::write_message(Message *m, bool more) {
FUNCTRACE(cct);
ceph_assert(connection->center->in_thread());
m->set_seq(++out_seq);
-
connection->lock.lock();
uint64_t ack_seq = in_seq;
ack_left = 0;
m->get_payload(),
m->get_middle(),
m->get_data());
- if (!append_frame(message)) {
+
+ bool is_appended;
+ if (m->get_type() == MSG_PGSTATS) {
+ is_appended = append_frame(message, m);
+ } else {
+ is_appended = append_frame(message);
+ }
+ if (!is_appended) {
m->put();
return -EILSEQ;
}
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
-
+ if (m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) {
+ ldout(cct, 0) << "We are sending MSG_PGSTATS; payload length: " << m->get_payload().length()
+ << "; middle length: " << m->get_middle().length()
+ << "; data length: " << m->get_data().length()
+ << "; header seq: " << m->get_seq()
+ << "; header tid: " << m->get_tid()
+ << "; header version: " << m->get_header().version
+ << "; header compat version: " << m->get_header().compat_version
+ << "; total frame length (after encode): " << connection->outgoing_bl.length()
+ << dendl;
+ }
m->trace.event("async writing message");
ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
<< " src=" << entity_name_t(messenger->get_myname())
}
template <class F>
-bool ProtocolV2::append_frame(F& frame) {
+bool ProtocolV2::append_frame(F& frame, Message *m) {
ceph::bufferlist bl;
try {
bl = frame.get_buffer(tx_frame_asm);
} catch (ceph::crypto::onwire::TxHandlerError &e) {
- ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ ldout(cct, 0) << __func__ << " Error: " << e.what() << dendl;
return false;
}
-
+ if (m && m->get_type() == MSG_PGSTATS && cct->_conf->ms_msg_pgstats_verbose) {
+ ldout(cct, 0) << "We are appending frame for MSG_PGSTATS; frame length: "
+ << bl.length()
+ << " bytes" << tx_frame_asm << dendl;
+ }
ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
<< " bytes " << tx_frame_asm << dendl;
connection->outgoing_bl.claim_append(bl);
const size_t cur_msg_size = get_current_msg_size();
auto msg_frame = MessageFrame::Decode(rx_segments_data);
-
+ if (msg_frame.header().type == MSG_PGSTATS) {
+ if (cct->_conf->ms_msg_pgstats_verbose) {
+ ldout(cct, 0) << " We have decoded MSG_PGSTATS message with "
+ << " message size: " << cur_msg_size
+ << dendl;
+ }
+ }
// XXX: paranoid copy just to avoid oops
ceph_msg_header2 current_header = msg_frame.header();
<< " src " << peer_name
<< " off " << current_header.data_off
<< dendl;
+
+ if (current_header.type == MSG_PGSTATS) {
+ if (cct->_conf->ms_msg_pgstats_verbose) {
+ uint64_t onwire_len = rx_frame_asm.get_frame_onwire_len();
+ ldout(cct, 0) << __func__
+ << " got front_length (payload): " << msg_frame.front_len()
+ << " + middle_length: " << msg_frame.middle_len()
+ << " + data_length: " << msg_frame.data_len()
+ << " byte message."
+ << " envelope type=" << current_header.type
+ << " src= " << peer_name
+ << " off " << current_header.data_off
+ << dendl;
+
+ ldout(cct, 0) << "We are handling MSG_PGSTATS; payload length: " << msg_frame.front_len()
+ << "; middle length: " << msg_frame.middle_len()
+ << "; data length: " << msg_frame.data_len()
+ << "; header seq: " << current_header.seq
+ << "; header tid: " << current_header.tid
+ << "; header version: " << current_header.version
+ << "; header compat version: " << current_header.compat_version
+ << "; total frame length (before decode): " << onwire_len
+ << dendl;
+ }
+}
INTERCEPT(16);
ceph_msg_header header{current_header.seq,