]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: crc configuration in messenger
authorCasey Bodley <casey@linuxbox.com>
Wed, 3 Sep 2014 16:29:17 +0000 (12:29 -0400)
committerMatt Benjamin <matt@cohortfs.com>
Wed, 14 Jan 2015 21:41:32 +0000 (16:41 -0500)
Add new header_crc and data_crc configuration booleans, and use
them consistently to govern whether CRC is performed in the
Message encode, decode, and transit paths.

Remove ms_nocrc, changes per Sage.
Mimimally adapt AsyncMessenger for crcflags.

Signed-off-by: Casey Bodley <casey@linuxbox.com>
Signed-off-by: Matt Benjamin <matt@cohortfs.com>
src/common/config_opts.h
src/messages/MForward.h
src/messages/MRoute.h
src/mon/Monitor.cc
src/msg/Message.cc
src/msg/Message.h
src/msg/Messenger.cc
src/msg/Messenger.h
src/msg/async/AsyncConnection.cc
src/msg/simple/Pipe.cc
src/test/encoding/ceph_dencoder.cc

index 08739c417205ed546478ce9183a321b229da59ed..a1e03d2aa894f7867a28b6fb2736edf5506636ea 100644 (file)
@@ -121,7 +121,8 @@ OPTION(ms_tcp_rcvbuf, OPT_INT, 0)
 OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy
 OPTION(ms_initial_backoff, OPT_DOUBLE, .2)
 OPTION(ms_max_backoff, OPT_DOUBLE, 15.0)
-OPTION(ms_nocrc, OPT_BOOL, false)
+OPTION(ms_crc_data, OPT_BOOL, true)
+OPTION(ms_crc_header, OPT_BOOL, true)
 OPTION(ms_die_on_bad_msg, OPT_BOOL, false)
 OPTION(ms_die_on_unhandled_msg, OPT_BOOL, false)
 OPTION(ms_die_on_old_message, OPT_BOOL, false)     // assert if we get a dup incoming message and shouldn't have (may be triggered by pre-541cd3c64be0dfa04e8a2df39422e0eb9541a428 code)
index c19865428c2b9e85de58066d8fd5a070c8eaffdb..92a739364ca632fc989180dfa98238eafc137a28 100644 (file)
@@ -71,7 +71,7 @@ public:
     ::decode(tid, p);
     ::decode(client, p);
     ::decode(client_caps, p);
-    msg = (PaxosServiceMessage *)decode_message(NULL, p);
+    msg = (PaxosServiceMessage *)decode_message(NULL, 0, p);
     if (header.version >= 2) {
       ::decode(con_features, p);
     } else {
index 49f639ba1f0fc4d9ffbaca7e5aa498e76c86c65b..d7a826e2dec7da5ee3fcd919e59021a681cee18c 100644 (file)
@@ -35,7 +35,7 @@ struct MRoute : public Message {
   MRoute(bufferlist bl, const entity_inst_t& i)
     : Message(MSG_ROUTE, HEAD_VERSION, COMPAT_VERSION), session_mon_tid(0), dest(i) {
     bufferlist::iterator p = bl.begin();
-    msg = decode_message(NULL, p);
+    msg = decode_message(NULL, 0, p);
   }
 private:
   ~MRoute() {
@@ -51,9 +51,9 @@ public:
       bool m;
       ::decode(m, p);
       if (m)
-       msg = decode_message(NULL, p);
+       msg = decode_message(NULL, 0, p);
     } else {
-      msg = decode_message(NULL, p);
+      msg = decode_message(NULL, 0, p);
     }
   }
   void encode_payload(uint64_t features) {
index 9d2880d9b1e81298ba5115cb921d76ee4fbda898..2f664316c702f4db724794c25586c763b3aa5708 100644 (file)
@@ -2974,7 +2974,7 @@ void Monitor::resend_routed_requests()
     RoutedRequest *rr = p->second;
 
     bufferlist::iterator q = rr->request_bl.begin();
-    PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, q);
+    PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
 
     if (mon == rank) {
       dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl;
index 9b652f1293b47efc2936672147e7776e7807af6c..f103602a8b1027d0e85d1878b9a8115b9ed42623 100644 (file)
@@ -173,7 +173,7 @@ using namespace std;
 
 #define dout_subsys ceph_subsys_ms
 
-void Message::encode(uint64_t features, bool datacrc)
+void Message::encode(uint64_t features, int crcflags)
 {
   // encode and copy out of *m
   if (empty_payload()) {
@@ -184,17 +184,19 @@ void Message::encode(uint64_t features, bool datacrc)
     if (header.compat_version == 0)
       header.compat_version = header.version;
   }
-  calc_front_crc();
+  if (crcflags & MSG_CRC_HEADER)
+    calc_front_crc();
 
   // update envelope
   header.front_len = get_payload().length();
   header.middle_len = get_middle().length();
   header.data_len = get_data().length();
-  calc_header_crc();
+  if (crcflags & MSG_CRC_HEADER)
+    calc_header_crc();
 
   footer.flags = CEPH_MSG_FOOTER_COMPLETE;
 
-  if (datacrc) {
+  if (crcflags & MSG_CRC_DATA) {
     calc_data_crc();
 
 #ifdef ENCODE_DUMP
@@ -246,11 +248,14 @@ void Message::dump(Formatter *f) const
   f->dump_string("summary", ss.str());
 }
 
-Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_footer& footer,
-                       bufferlist& front, bufferlist& middle, bufferlist& data)
+Message *decode_message(CephContext *cct, int crcflags,
+                       ceph_msg_header& header,
+                       ceph_msg_footer& footer,
+                       bufferlist& front, bufferlist& middle,
+                       bufferlist& data)
 {
   // verify crc
-  if (!cct || !cct->_conf->ms_nocrc) {
+  if (crcflags & MSG_CRC_HEADER) {
     __u32 front_crc = front.crc32c(0);
     __u32 middle_crc = middle.crc32c(0);
 
@@ -788,7 +793,7 @@ void encode_message(Message *msg, uint64_t features, bufferlist& payload)
 // We've slipped in a 0 signature at this point, so any signature checking after this will
 // fail.  PLR
 
-Message *decode_message(CephContext *cct, bufferlist::iterator& p)
+Message *decode_message(CephContext *cct, int crcflags, bufferlist::iterator& p)
 {
   ceph_msg_header h;
   ceph_msg_footer_old fo;
@@ -804,6 +809,6 @@ Message *decode_message(CephContext *cct, bufferlist::iterator& p)
   ::decode(fr, p);
   ::decode(mi, p);
   ::decode(da, p);
-  return decode_message(cct, h, f, fr, mi, da);
+  return decode_message(cct, crcflags, h, f, fr, mi, da);
 }
 
index bf447ab1f982236ab3efe793425190397c04a368..0d45c2dc8f8747cd2550b951cbf257c08ad63b98 100644 (file)
 #define MSG_TIMECHECK             0x600
 #define MSG_MON_HEALTH            0x601
 
+// *** Message::encode() crcflags bits ***
+#define MSG_CRC_DATA           1
+#define MSG_CRC_HEADER         2
+
 
 // ======================================================
 
@@ -422,11 +426,12 @@ public:
 
   virtual void dump(Formatter *f) const;
 
-  void encode(uint64_t features, bool datacrc);
+  void encode(uint64_t features, int crcflags);
 };
 typedef boost::intrusive_ptr<Message> MessageRef;
 
-extern Message *decode_message(CephContext *cct, ceph_msg_header &header,
+extern Message *decode_message(CephContext *cct, int crcflags,
+                              ceph_msg_header &header,
                               ceph_msg_footer& footer, bufferlist& front,
                               bufferlist& middle, bufferlist& data);
 inline ostream& operator<<(ostream& out, Message& m) {
@@ -437,6 +442,7 @@ inline ostream& operator<<(ostream& out, Message& m) {
 }
 
 extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
-extern Message *decode_message(CephContext *cct, bufferlist::iterator& bl);
+extern Message *decode_message(CephContext *cct, int crcflags,
+                               bufferlist::iterator& bl);
 
 #endif
index 766cf172f2e5c64cd404e99b6e2365914faef8cf..be0091d72a9418274e028bd33b4f77f372c907c4 100644 (file)
@@ -1,3 +1,5 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
 
 #include "include/types.h"
 #include "Messenger.h"
@@ -20,3 +22,18 @@ Messenger *Messenger::create(CephContext *cct, const string &type,
   lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
   return NULL;
 }
+
+/*
+ * Pre-calculate desired software CRC settings.  CRC computation may
+ * be disabled by default for some transports (e.g., those with strong
+ * hardware checksum support).
+ */
+int Messenger::get_default_crc_flags(md_config_t * conf)
+{
+  int r = 0;
+  if (conf->ms_crc_data)
+    r |= MSG_CRC_DATA;
+  if (conf->ms_crc_header)
+    r |= MSG_CRC_HEADER;
+  return r;
+}
index d6b542c4990fc3571c1e50db82ff9333698f870c..b21bb336d26b6fcd56aa5305d1b66627d7bd8d6a 100644 (file)
@@ -54,6 +54,7 @@ public:
    *  from this value.
    */
   CephContext *cct;
+  int crcflags;
 
   /**
    * A Policy describes the rules of a Connection. Is there a limit on how
@@ -126,7 +127,8 @@ public:
   Messenger(CephContext *cct_, entity_name_t w)
     : my_inst(),
       default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
-      cct(cct_)
+      cct(cct_),
+      crcflags(get_default_crc_flags(cct->_conf))
   {
     my_inst.name = w;
   }
@@ -216,6 +218,11 @@ public:
    * (0 if the queue is empty)
    */
   virtual double get_dispatch_queue_max_age(utime_t now) = 0;
+  /**
+   * Get the default crc flags for this messenger.
+   * but not yet dispatched.
+   */
+  static int get_default_crc_flags(md_config_t *);
 
   /**
    * @} // Accessors
index b071d79abc00d224451de9581c4e7f09af7843b6..014c44cc0b06140c8a111ce39083a8b2e425111f 100644 (file)
@@ -716,7 +716,7 @@ void AsyncConnection::process()
 
           ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length()
                               << " + " << data.length() << " byte message" << dendl;
-          Message *message = decode_message(async_msgr->cct, current_header, footer, front, middle, data);
+          Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer, front, middle, data);
           if (!message) {
             ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl;
             goto fail;
@@ -1947,7 +1947,7 @@ int AsyncConnection::_send(Message *m)
                          << features << " " << m << " " << *m << dendl;
 
   // encode and copy out of *m
-  m->encode(features, !async_msgr->cct->_conf->ms_nocrc);
+  m->encode(features, async_msgr->crcflags);
 
   // prepare everything
   ceph_msg_header& header = m->get_header();
index 6f4f989668b1d8c779c6f2225d990b1c51046b52..6a3cb642b21e77b030750ac6f8006fc3e6110f65 100644 (file)
@@ -1773,7 +1773,7 @@ void Pipe::writer()
                              << " " << m << " " << *m << dendl;
 
        // encode and copy out of *m
-       m->encode(features, !msgr->cct->_conf->ms_nocrc);
+       m->encode(features, msgr->crcflags);
 
        // prepare everything
        ceph_msg_header& header = m->get_header();
@@ -1876,11 +1876,13 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
   ceph_msg_header header; 
   ceph_msg_footer footer;
   __u32 header_crc;
-  
+
   if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
     if (tcp_read((char*)&header, sizeof(header)) < 0)
       return -1;
-    header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+    }
   } else {
     ceph_msg_header_old oldheader;
     if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
@@ -1889,8 +1891,10 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
     memcpy(&header, &oldheader, sizeof(header));
     header.src = oldheader.src.name;
     header.reserved = oldheader.reserved;
-    header.crc = oldheader.crc;
-    header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      header.crc = oldheader.crc;
+      header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
+    }
   }
 
   ldout(msgr->cct,20) << "reader got envelope type=" << header.type
@@ -1901,7 +1905,8 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
            << dendl;
 
   // verify header crc
-  if (header_crc != header.crc) {
+  if (!(msgr->crcflags & MSG_CRC_HEADER)) {
+  } else if (header_crc != header.crc) {
     ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
     return -1;
   }
@@ -2027,9 +2032,11 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
     ceph_msg_footer_old old_footer;
     if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
       goto out_dethrottle;
-    footer.front_crc = old_footer.front_crc;
-    footer.middle_crc = old_footer.middle_crc;
-    footer.data_crc = old_footer.data_crc;
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      footer.front_crc = old_footer.front_crc;
+      footer.middle_crc = old_footer.middle_crc;
+      footer.data_crc = old_footer.data_crc;
+    }
     footer.sig = 0;
     footer.flags = old_footer.flags;
   }
@@ -2045,7 +2052,7 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
 
   ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
           << " byte message" << dendl;
-  message = decode_message(msgr->cct, header, footer, front, middle, data);
+  message = decode_message(msgr->cct, msgr->crcflags, header, footer, front, middle, data);
   if (!message) {
     ret = -EINVAL;
     goto out_dethrottle;
@@ -2241,8 +2248,12 @@ int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, buffer
     oldheader.src.addr = connection_state->get_peer_addr();
     oldheader.orig_src = oldheader.src;
     oldheader.reserved = header.reserved;
-    oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
-                               sizeof(oldheader) - sizeof(oldheader.crc));
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+       oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
+                                   sizeof(oldheader) - sizeof(oldheader.crc));
+    } else {
+       oldheader.crc = 0;
+    }
     msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
     msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
     msglen += sizeof(oldheader);
@@ -2305,9 +2316,13 @@ int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, buffer
     msglen += sizeof(footer);
     msg.msg_iovlen++;
   } else {
-    old_footer.front_crc = footer.front_crc;   
-    old_footer.middle_crc = footer.middle_crc;   
-    old_footer.data_crc = footer.data_crc;   
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      old_footer.front_crc = footer.front_crc;
+      old_footer.middle_crc = footer.middle_crc;
+      old_footer.data_crc = footer.data_crc;
+    } else {
+       old_footer.front_crc = old_footer.middle_crc = old_footer.data_crc = 0;
+    }
     old_footer.flags = footer.flags;   
     msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
     msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
index d5068a1a4d468fa7441649ff91d2d8e1ce8ed510..7e1056503ce016de617cc70fe5995697290d8567 100644 (file)
@@ -175,7 +175,7 @@ public:
     bufferlist::iterator p = bl.begin();
     p.seek(seek);
     try {
-      Message *n = decode_message(g_ceph_context, p);
+      Message *n = decode_message(g_ceph_context, 0, p);
       if (!n)
        throw std::runtime_error("failed to decode");
       if (n->get_type() != m_object->get_type()) {