OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy
OPTION(ms_initial_backoff, OPT_DOUBLE, .2)
OPTION(ms_max_backoff, OPT_DOUBLE, 15.0)
-OPTION(ms_nocrc, OPT_BOOL, false)
+OPTION(ms_crc_data, OPT_BOOL, true)
+OPTION(ms_crc_header, OPT_BOOL, true)
OPTION(ms_die_on_bad_msg, OPT_BOOL, false)
OPTION(ms_die_on_unhandled_msg, OPT_BOOL, false)
OPTION(ms_die_on_old_message, OPT_BOOL, false) // assert if we get a dup incoming message and shouldn't have (may be triggered by pre-541cd3c64be0dfa04e8a2df39422e0eb9541a428 code)
::decode(tid, p);
::decode(client, p);
::decode(client_caps, p);
- msg = (PaxosServiceMessage *)decode_message(NULL, p);
+ msg = (PaxosServiceMessage *)decode_message(NULL, 0, p);
if (header.version >= 2) {
::decode(con_features, p);
} else {
MRoute(bufferlist bl, const entity_inst_t& i)
: Message(MSG_ROUTE, HEAD_VERSION, COMPAT_VERSION), session_mon_tid(0), dest(i) {
bufferlist::iterator p = bl.begin();
- msg = decode_message(NULL, p);
+ msg = decode_message(NULL, 0, p);
}
private:
~MRoute() {
bool m;
::decode(m, p);
if (m)
- msg = decode_message(NULL, p);
+ msg = decode_message(NULL, 0, p);
} else {
- msg = decode_message(NULL, p);
+ msg = decode_message(NULL, 0, p);
}
}
void encode_payload(uint64_t features) {
RoutedRequest *rr = p->second;
bufferlist::iterator q = rr->request_bl.begin();
- PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, q);
+ PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
if (mon == rank) {
dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl;
#define dout_subsys ceph_subsys_ms
-void Message::encode(uint64_t features, bool datacrc)
+void Message::encode(uint64_t features, int crcflags)
{
// encode and copy out of *m
if (empty_payload()) {
if (header.compat_version == 0)
header.compat_version = header.version;
}
- calc_front_crc();
+ if (crcflags & MSG_CRC_HEADER)
+ calc_front_crc();
// update envelope
header.front_len = get_payload().length();
header.middle_len = get_middle().length();
header.data_len = get_data().length();
- calc_header_crc();
+ if (crcflags & MSG_CRC_HEADER)
+ calc_header_crc();
footer.flags = CEPH_MSG_FOOTER_COMPLETE;
- if (datacrc) {
+ if (crcflags & MSG_CRC_DATA) {
calc_data_crc();
#ifdef ENCODE_DUMP
f->dump_string("summary", ss.str());
}
-Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_footer& footer,
- bufferlist& front, bufferlist& middle, bufferlist& data)
+Message *decode_message(CephContext *cct, int crcflags,
+ ceph_msg_header& header,
+ ceph_msg_footer& footer,
+ bufferlist& front, bufferlist& middle,
+ bufferlist& data)
{
// verify crc
- if (!cct || !cct->_conf->ms_nocrc) {
+ if (crcflags & MSG_CRC_HEADER) {
__u32 front_crc = front.crc32c(0);
__u32 middle_crc = middle.crc32c(0);
// We've slipped in a 0 signature at this point, so any signature checking after this will
// fail. PLR
-Message *decode_message(CephContext *cct, bufferlist::iterator& p)
+Message *decode_message(CephContext *cct, int crcflags, bufferlist::iterator& p)
{
ceph_msg_header h;
ceph_msg_footer_old fo;
::decode(fr, p);
::decode(mi, p);
::decode(da, p);
- return decode_message(cct, h, f, fr, mi, da);
+ return decode_message(cct, crcflags, h, f, fr, mi, da);
}
#define MSG_TIMECHECK 0x600
#define MSG_MON_HEALTH 0x601
+// *** Message::encode() crcflags bits ***
+#define MSG_CRC_DATA 1
+#define MSG_CRC_HEADER 2
+
// ======================================================
virtual void dump(Formatter *f) const;
- void encode(uint64_t features, bool datacrc);
+ void encode(uint64_t features, int crcflags);
};
typedef boost::intrusive_ptr<Message> MessageRef;
-extern Message *decode_message(CephContext *cct, ceph_msg_header &header,
+extern Message *decode_message(CephContext *cct, int crcflags,
+ ceph_msg_header &header,
ceph_msg_footer& footer, bufferlist& front,
bufferlist& middle, bufferlist& data);
inline ostream& operator<<(ostream& out, Message& m) {
}
extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
-extern Message *decode_message(CephContext *cct, bufferlist::iterator& bl);
+extern Message *decode_message(CephContext *cct, int crcflags,
+ bufferlist::iterator& bl);
#endif
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
#include "include/types.h"
#include "Messenger.h"
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return NULL;
}
+
+/*
+ * Pre-calculate desired software CRC settings. CRC computation may
+ * be disabled by default for some transports (e.g., those with strong
+ * hardware checksum support).
+ */
+int Messenger::get_default_crc_flags(md_config_t * conf)
+{
+ int r = 0;
+ if (conf->ms_crc_data)
+ r |= MSG_CRC_DATA;
+ if (conf->ms_crc_header)
+ r |= MSG_CRC_HEADER;
+ return r;
+}
* from this value.
*/
CephContext *cct;
+ int crcflags;
/**
* A Policy describes the rules of a Connection. Is there a limit on how
Messenger(CephContext *cct_, entity_name_t w)
: my_inst(),
default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
- cct(cct_)
+ cct(cct_),
+ crcflags(get_default_crc_flags(cct->_conf))
{
my_inst.name = w;
}
* (0 if the queue is empty)
*/
virtual double get_dispatch_queue_max_age(utime_t now) = 0;
+ /**
+ * Get the default crc flags for this messenger.
+ * but not yet dispatched.
+ */
+ static int get_default_crc_flags(md_config_t *);
/**
* @} // Accessors
ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length()
<< " + " << data.length() << " byte message" << dendl;
- Message *message = decode_message(async_msgr->cct, current_header, footer, front, middle, data);
+ Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer, front, middle, data);
if (!message) {
ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl;
goto fail;
<< features << " " << m << " " << *m << dendl;
// encode and copy out of *m
- m->encode(features, !async_msgr->cct->_conf->ms_nocrc);
+ m->encode(features, async_msgr->crcflags);
// prepare everything
ceph_msg_header& header = m->get_header();
<< " " << m << " " << *m << dendl;
// encode and copy out of *m
- m->encode(features, !msgr->cct->_conf->ms_nocrc);
+ m->encode(features, msgr->crcflags);
// prepare everything
ceph_msg_header& header = m->get_header();
ceph_msg_header header;
ceph_msg_footer footer;
__u32 header_crc;
-
+
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
if (tcp_read((char*)&header, sizeof(header)) < 0)
return -1;
- header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+ if (msgr->crcflags & MSG_CRC_HEADER) {
+ header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+ }
} else {
ceph_msg_header_old oldheader;
if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
memcpy(&header, &oldheader, sizeof(header));
header.src = oldheader.src.name;
header.reserved = oldheader.reserved;
- header.crc = oldheader.crc;
- header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
+ if (msgr->crcflags & MSG_CRC_HEADER) {
+ header.crc = oldheader.crc;
+ header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
+ }
}
ldout(msgr->cct,20) << "reader got envelope type=" << header.type
<< dendl;
// verify header crc
- if (header_crc != header.crc) {
+ if (!(msgr->crcflags & MSG_CRC_HEADER)) {
+ } else if (header_crc != header.crc) {
ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
return -1;
}
ceph_msg_footer_old old_footer;
if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
goto out_dethrottle;
- footer.front_crc = old_footer.front_crc;
- footer.middle_crc = old_footer.middle_crc;
- footer.data_crc = old_footer.data_crc;
+ if (msgr->crcflags & MSG_CRC_HEADER) {
+ footer.front_crc = old_footer.front_crc;
+ footer.middle_crc = old_footer.middle_crc;
+ footer.data_crc = old_footer.data_crc;
+ }
footer.sig = 0;
footer.flags = old_footer.flags;
}
ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message" << dendl;
- message = decode_message(msgr->cct, header, footer, front, middle, data);
+ message = decode_message(msgr->cct, msgr->crcflags, header, footer, front, middle, data);
if (!message) {
ret = -EINVAL;
goto out_dethrottle;
oldheader.src.addr = connection_state->get_peer_addr();
oldheader.orig_src = oldheader.src;
oldheader.reserved = header.reserved;
- oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
- sizeof(oldheader) - sizeof(oldheader.crc));
+ if (msgr->crcflags & MSG_CRC_HEADER) {
+ oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
+ sizeof(oldheader) - sizeof(oldheader.crc));
+ } else {
+ oldheader.crc = 0;
+ }
msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
msglen += sizeof(oldheader);
msglen += sizeof(footer);
msg.msg_iovlen++;
} else {
- old_footer.front_crc = footer.front_crc;
- old_footer.middle_crc = footer.middle_crc;
- old_footer.data_crc = footer.data_crc;
+ if (msgr->crcflags & MSG_CRC_HEADER) {
+ old_footer.front_crc = footer.front_crc;
+ old_footer.middle_crc = footer.middle_crc;
+ old_footer.data_crc = footer.data_crc;
+ } else {
+ old_footer.front_crc = old_footer.middle_crc = old_footer.data_crc = 0;
+ }
old_footer.flags = footer.flags;
msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
bufferlist::iterator p = bl.begin();
p.seek(seek);
try {
- Message *n = decode_message(g_ceph_context, p);
+ Message *n = decode_message(g_ceph_context, 0, p);
if (!n)
throw std::runtime_error("failed to decode");
if (n->get_type() != m_object->get_type()) {