From: Casey Bodley Date: Wed, 3 Sep 2014 16:29:17 +0000 (-0400) Subject: msg: crc configuration in messenger X-Git-Tag: v0.93~265^2~21 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2ffacbe6efae05f5c426cae34882b3351a5ccfbe;p=ceph.git msg: crc configuration in messenger Add new header_crc and data_crc configuration booleans, and use them consistently to govern whether CRC is performed in the Message encode, decode, and transit paths. Remove ms_nocrc, changes per Sage. Mimimally adapt AsyncMessenger for crcflags. Signed-off-by: Casey Bodley Signed-off-by: Matt Benjamin --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 08739c417205..a1e03d2aa894 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -121,7 +121,8 @@ OPTION(ms_tcp_rcvbuf, OPT_INT, 0) OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy OPTION(ms_initial_backoff, OPT_DOUBLE, .2) OPTION(ms_max_backoff, OPT_DOUBLE, 15.0) -OPTION(ms_nocrc, OPT_BOOL, false) +OPTION(ms_crc_data, OPT_BOOL, true) +OPTION(ms_crc_header, OPT_BOOL, true) OPTION(ms_die_on_bad_msg, OPT_BOOL, false) OPTION(ms_die_on_unhandled_msg, OPT_BOOL, false) OPTION(ms_die_on_old_message, OPT_BOOL, false) // assert if we get a dup incoming message and shouldn't have (may be triggered by pre-541cd3c64be0dfa04e8a2df39422e0eb9541a428 code) diff --git a/src/messages/MForward.h b/src/messages/MForward.h index c19865428c2b..92a739364ca6 100644 --- a/src/messages/MForward.h +++ b/src/messages/MForward.h @@ -71,7 +71,7 @@ public: ::decode(tid, p); ::decode(client, p); ::decode(client_caps, p); - msg = (PaxosServiceMessage *)decode_message(NULL, p); + msg = (PaxosServiceMessage *)decode_message(NULL, 0, p); if (header.version >= 2) { ::decode(con_features, p); } else { diff --git a/src/messages/MRoute.h b/src/messages/MRoute.h index 49f639ba1f0f..d7a826e2dec7 100644 --- a/src/messages/MRoute.h +++ b/src/messages/MRoute.h @@ -35,7 +35,7 @@ struct MRoute : public Message { MRoute(bufferlist bl, const entity_inst_t& i) : Message(MSG_ROUTE, HEAD_VERSION, COMPAT_VERSION), session_mon_tid(0), dest(i) { bufferlist::iterator p = bl.begin(); - msg = decode_message(NULL, p); + msg = decode_message(NULL, 0, p); } private: ~MRoute() { @@ -51,9 +51,9 @@ public: bool m; ::decode(m, p); if (m) - msg = decode_message(NULL, p); + msg = decode_message(NULL, 0, p); } else { - msg = decode_message(NULL, p); + msg = decode_message(NULL, 0, p); } } void encode_payload(uint64_t features) { diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 9d2880d9b1e8..2f664316c702 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -2974,7 +2974,7 @@ void Monitor::resend_routed_requests() RoutedRequest *rr = p->second; bufferlist::iterator q = rr->request_bl.begin(); - PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, q); + PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q); if (mon == rank) { dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 9b652f1293b4..f103602a8b10 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -173,7 +173,7 @@ using namespace std; #define dout_subsys ceph_subsys_ms -void Message::encode(uint64_t features, bool datacrc) +void Message::encode(uint64_t features, int crcflags) { // encode and copy out of *m if (empty_payload()) { @@ -184,17 +184,19 @@ void Message::encode(uint64_t features, bool datacrc) if (header.compat_version == 0) header.compat_version = header.version; } - calc_front_crc(); + if (crcflags & MSG_CRC_HEADER) + calc_front_crc(); // update envelope header.front_len = get_payload().length(); header.middle_len = get_middle().length(); header.data_len = get_data().length(); - calc_header_crc(); + if (crcflags & MSG_CRC_HEADER) + calc_header_crc(); footer.flags = CEPH_MSG_FOOTER_COMPLETE; - if (datacrc) { + if (crcflags & MSG_CRC_DATA) { calc_data_crc(); #ifdef ENCODE_DUMP @@ -246,11 +248,14 @@ void Message::dump(Formatter *f) const f->dump_string("summary", ss.str()); } -Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_footer& footer, - bufferlist& front, bufferlist& middle, bufferlist& data) +Message *decode_message(CephContext *cct, int crcflags, + ceph_msg_header& header, + ceph_msg_footer& footer, + bufferlist& front, bufferlist& middle, + bufferlist& data) { // verify crc - if (!cct || !cct->_conf->ms_nocrc) { + if (crcflags & MSG_CRC_HEADER) { __u32 front_crc = front.crc32c(0); __u32 middle_crc = middle.crc32c(0); @@ -788,7 +793,7 @@ void encode_message(Message *msg, uint64_t features, bufferlist& payload) // We've slipped in a 0 signature at this point, so any signature checking after this will // fail. PLR -Message *decode_message(CephContext *cct, bufferlist::iterator& p) +Message *decode_message(CephContext *cct, int crcflags, bufferlist::iterator& p) { ceph_msg_header h; ceph_msg_footer_old fo; @@ -804,6 +809,6 @@ Message *decode_message(CephContext *cct, bufferlist::iterator& p) ::decode(fr, p); ::decode(mi, p); ::decode(da, p); - return decode_message(cct, h, f, fr, mi, da); + return decode_message(cct, crcflags, h, f, fr, mi, da); } diff --git a/src/msg/Message.h b/src/msg/Message.h index bf447ab1f982..0d45c2dc8f87 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -169,6 +169,10 @@ #define MSG_TIMECHECK 0x600 #define MSG_MON_HEALTH 0x601 +// *** Message::encode() crcflags bits *** +#define MSG_CRC_DATA 1 +#define MSG_CRC_HEADER 2 + // ====================================================== @@ -422,11 +426,12 @@ public: virtual void dump(Formatter *f) const; - void encode(uint64_t features, bool datacrc); + void encode(uint64_t features, int crcflags); }; typedef boost::intrusive_ptr MessageRef; -extern Message *decode_message(CephContext *cct, ceph_msg_header &header, +extern Message *decode_message(CephContext *cct, int crcflags, + ceph_msg_header &header, ceph_msg_footer& footer, bufferlist& front, bufferlist& middle, bufferlist& data); inline ostream& operator<<(ostream& out, Message& m) { @@ -437,6 +442,7 @@ inline ostream& operator<<(ostream& out, Message& m) { } extern void encode_message(Message *m, uint64_t features, bufferlist& bl); -extern Message *decode_message(CephContext *cct, bufferlist::iterator& bl); +extern Message *decode_message(CephContext *cct, int crcflags, + bufferlist::iterator& bl); #endif diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index 766cf172f2e5..be0091d72a94 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -1,3 +1,5 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab #include "include/types.h" #include "Messenger.h" @@ -20,3 +22,18 @@ Messenger *Messenger::create(CephContext *cct, const string &type, lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl; return NULL; } + +/* + * Pre-calculate desired software CRC settings. CRC computation may + * be disabled by default for some transports (e.g., those with strong + * hardware checksum support). + */ +int Messenger::get_default_crc_flags(md_config_t * conf) +{ + int r = 0; + if (conf->ms_crc_data) + r |= MSG_CRC_DATA; + if (conf->ms_crc_header) + r |= MSG_CRC_HEADER; + return r; +} diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index d6b542c4990f..b21bb336d26b 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -54,6 +54,7 @@ public: * from this value. */ CephContext *cct; + int crcflags; /** * A Policy describes the rules of a Connection. Is there a limit on how @@ -126,7 +127,8 @@ public: Messenger(CephContext *cct_, entity_name_t w) : my_inst(), default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false), - cct(cct_) + cct(cct_), + crcflags(get_default_crc_flags(cct->_conf)) { my_inst.name = w; } @@ -216,6 +218,11 @@ public: * (0 if the queue is empty) */ virtual double get_dispatch_queue_max_age(utime_t now) = 0; + /** + * Get the default crc flags for this messenger. + * but not yet dispatched. + */ + static int get_default_crc_flags(md_config_t *); /** * @} // Accessors diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b071d79abc00..014c44cc0b06 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -716,7 +716,7 @@ void AsyncConnection::process() ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message" << dendl; - Message *message = decode_message(async_msgr->cct, current_header, footer, front, middle, data); + Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer, front, middle, data); if (!message) { ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl; goto fail; @@ -1947,7 +1947,7 @@ int AsyncConnection::_send(Message *m) << features << " " << m << " " << *m << dendl; // encode and copy out of *m - m->encode(features, !async_msgr->cct->_conf->ms_nocrc); + m->encode(features, async_msgr->crcflags); // prepare everything ceph_msg_header& header = m->get_header(); diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc index 6f4f989668b1..6a3cb642b21e 100644 --- a/src/msg/simple/Pipe.cc +++ b/src/msg/simple/Pipe.cc @@ -1773,7 +1773,7 @@ void Pipe::writer() << " " << m << " " << *m << dendl; // encode and copy out of *m - m->encode(features, !msgr->cct->_conf->ms_nocrc); + m->encode(features, msgr->crcflags); // prepare everything ceph_msg_header& header = m->get_header(); @@ -1876,11 +1876,13 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) ceph_msg_header header; ceph_msg_footer footer; __u32 header_crc; - + if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { if (tcp_read((char*)&header, sizeof(header)) < 0) return -1; - header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); + if (msgr->crcflags & MSG_CRC_HEADER) { + header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); + } } else { ceph_msg_header_old oldheader; if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0) @@ -1889,8 +1891,10 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) memcpy(&header, &oldheader, sizeof(header)); header.src = oldheader.src.name; header.reserved = oldheader.reserved; - header.crc = oldheader.crc; - header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); + if (msgr->crcflags & MSG_CRC_HEADER) { + header.crc = oldheader.crc; + header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); + } } ldout(msgr->cct,20) << "reader got envelope type=" << header.type @@ -1901,7 +1905,8 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) << dendl; // verify header crc - if (header_crc != header.crc) { + if (!(msgr->crcflags & MSG_CRC_HEADER)) { + } else if (header_crc != header.crc) { ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; return -1; } @@ -2027,9 +2032,11 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) ceph_msg_footer_old old_footer; if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0) goto out_dethrottle; - footer.front_crc = old_footer.front_crc; - footer.middle_crc = old_footer.middle_crc; - footer.data_crc = old_footer.data_crc; + if (msgr->crcflags & MSG_CRC_HEADER) { + footer.front_crc = old_footer.front_crc; + footer.middle_crc = old_footer.middle_crc; + footer.data_crc = old_footer.data_crc; + } footer.sig = 0; footer.flags = old_footer.flags; } @@ -2045,7 +2052,7 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message" << dendl; - message = decode_message(msgr->cct, header, footer, front, middle, data); + message = decode_message(msgr->cct, msgr->crcflags, header, footer, front, middle, data); if (!message) { ret = -EINVAL; goto out_dethrottle; @@ -2241,8 +2248,12 @@ int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, buffer oldheader.src.addr = connection_state->get_peer_addr(); oldheader.orig_src = oldheader.src; oldheader.reserved = header.reserved; - oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, - sizeof(oldheader) - sizeof(oldheader.crc)); + if (msgr->crcflags & MSG_CRC_HEADER) { + oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, + sizeof(oldheader) - sizeof(oldheader.crc)); + } else { + oldheader.crc = 0; + } msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader; msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader); msglen += sizeof(oldheader); @@ -2305,9 +2316,13 @@ int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, buffer msglen += sizeof(footer); msg.msg_iovlen++; } else { - old_footer.front_crc = footer.front_crc; - old_footer.middle_crc = footer.middle_crc; - old_footer.data_crc = footer.data_crc; + if (msgr->crcflags & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + old_footer.data_crc = footer.data_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = old_footer.data_crc = 0; + } old_footer.flags = footer.flags; msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer; msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer); diff --git a/src/test/encoding/ceph_dencoder.cc b/src/test/encoding/ceph_dencoder.cc index d5068a1a4d46..7e1056503ce0 100644 --- a/src/test/encoding/ceph_dencoder.cc +++ b/src/test/encoding/ceph_dencoder.cc @@ -175,7 +175,7 @@ public: bufferlist::iterator p = bl.begin(); p.seek(seek); try { - Message *n = decode_message(g_ceph_context, p); + Message *n = decode_message(g_ceph_context, 0, p); if (!n) throw std::runtime_error("failed to decode"); if (n->get_type() != m_object->get_type()) {