]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: handle 'middle' section of message
authorSage Weil <sage@newdream.net>
Tue, 18 Aug 2009 19:16:42 +0000 (12:16 -0700)
committerSage Weil <sage@newdream.net>
Tue, 18 Aug 2009 19:16:42 +0000 (12:16 -0700)
src/kernel/messenger.c
src/msg/Message.cc
src/msg/Message.h
src/msg/SimpleMessenger.cc

index e9c702e95e9f9173e6b002791121676a16a12973..7016e8a4d0c91863ef749a44b93a72211b9759e0 100644 (file)
@@ -645,20 +645,24 @@ static void prepare_write_message(struct ceph_connection *con)
        list_move_tail(&m->list_head, &con->out_sent);
        con->out_msg = m;   /* we don't bother taking a reference here. */
 
-       dout("prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
+       dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
             m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type),
-            le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len),
+            le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
+            le32_to_cpu(m->hdr.data_len),
             m->nr_pages);
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
-       /* tag + hdr + front */
+       /* tag + hdr + front + middle */
        con->out_kvec[v].iov_base = &tag_msg;
        con->out_kvec[v++].iov_len = 1;
        con->out_kvec[v].iov_base = &m->hdr;
        con->out_kvec[v++].iov_len = sizeof(m->hdr);
        con->out_kvec[v++] = m->front;
+       if (m->middle.iov_len)
+               con->out_kvec[v++] = m->middle;
        con->out_kvec_left = v;
-       con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
+       con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
+               m->middle.iov_len;
        con->out_kvec_cur = con->out_kvec;
 
        /* fill in crc (except data pages), footer */
@@ -668,8 +672,16 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_msg->footer.flags = 0;
        con->out_msg->footer.front_crc =
                cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
-       con->out_msg->footer.middle_crc = 0;
+       if (m->middle.iov_base)
+               con->out_msg->footer.middle_crc =
+                       cpu_to_le32(crc32c(0, m->middle.iov_base,
+                                          m->middle.iov_len));
+       else
+               con->out_msg->footer.middle_crc = 0;
        con->out_msg->footer.data_crc = 0;
+       dout("prepare_write_message front_crc %u data_crc %u\n",
+            le32_to_cpu(con->out_msg->footer.front_crc),
+            le32_to_cpu(con->out_msg->footer.middle_crc));
 
        /* is there a data payload? */
        if (le32_to_cpu(m->hdr.data_len) > 0) {
@@ -2268,11 +2280,12 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                ceph_msg_put(msg);
        } else {
                msg->hdr.seq = cpu_to_le64(++con->out_seq);
-               dout("----- %p %u to %s%d %d=%s len %d+%d -----\n", msg,
+               dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg,
                     (unsigned)con->out_seq,
                     ENTITY_NAME(msg->hdr.dst.name), le16_to_cpu(msg->hdr.type),
                     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
                     le32_to_cpu(msg->hdr.front_len),
+                    le32_to_cpu(msg->hdr.middle_len),
                     le32_to_cpu(msg->hdr.data_len));
                dout("ceph_msg_send %p seq %llu for %s%d on %p pgs %d\n",
                     msg, le64_to_cpu(msg->hdr.seq),
index de03dc3679da245a848d591bd621fd80e890b36a..d3ec2bd6b13ec4925d644aa0c4cf8c2c2432b206 100644 (file)
@@ -130,20 +130,34 @@ using namespace std;
 
 
 Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
-                       bufferlist& front, bufferlist& data)
+                       bufferlist& front, bufferlist& middle, bufferlist& data)
 {
   // verify crc
   if (!g_conf.ms_nocrc) {
     __u32 front_crc = front.crc32c(0);
+    __u32 middle_crc = middle.crc32c(0);
     __u32 data_crc = data.crc32c(0);
 
     if (front_crc != footer.front_crc) {
-      dout(0) << "bad crc in front " << front_crc << " != " << footer.front_crc << dendl;
+      dout(0) << "bad crc in front " << front_crc << " != exp " << footer.front_crc << dendl;
+      dout(20);
+      front.hexdump(*_dout);
+      *_dout << dendl;
+      return 0;
+    }
+    if (middle_crc != footer.middle_crc) {
+      dout(0) << "bad crc in middle " << middle_crc << " != exp " << footer.middle_crc << dendl;
+      dout(20);
+      middle.hexdump(*_dout);
+      *_dout << dendl;
       return 0;
     }
     if (data_crc != footer.data_crc &&
         !(footer.flags & CEPH_MSG_FOOTER_NOCRC)) {
-      dout(0) << "bad crc in data " << data_crc << " != " << footer.data_crc << dendl;
+      dout(0) << "bad crc in data " << data_crc << " != exp " << footer.data_crc << dendl;
+      dout(20);
+      data.hexdump(*_dout);
+      *_dout << dendl;
       return 0;
     }
   }
@@ -472,6 +486,7 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
   m->set_header(header);
   m->set_footer(footer);
   m->set_payload(front);
+  m->set_middle(middle);
   m->set_data(data);
 
   try {
@@ -480,6 +495,7 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
   catch (buffer::error *e) {
     dout(0) << "failed to decode message of type " << type << ": " << *e << dendl;
     delete e;
+    assert(0);
     return 0;
   }
 
index 2166fbc2b8aee30ea87ac006ac7671b04e16a1e1..386df64b3bfddb26da20e07ea95a82506ee3cd5f 100644 (file)
@@ -138,6 +138,7 @@ protected:
   ceph_msg_header  header;      // headerelope
   ceph_msg_footer  footer;
   bufferlist       payload;  // "front" unaligned blob
+  bufferlist       middle;   // "middle" unaligned blob
   bufferlist       data;     // data payload (page-alignment will be preserved where possible)
   
   utime_t recv_stamp;
@@ -181,12 +182,15 @@ public:
   void set_footer(const ceph_msg_footer &e) { footer = e; }
   ceph_msg_footer &get_footer() { return footer; }
 
-  void clear_payload() { payload.clear(); }
+  void clear_payload() { payload.clear(); middle.clear(); }
   bool empty_payload() { return payload.length() == 0; }
   bufferlist& get_payload() { return payload; }
   void set_payload(bufferlist& bl) { payload.claim(bl); }
   void copy_payload(const bufferlist& bl) { payload = bl; }
 
+  void set_middle(bufferlist& bl) { middle.claim(bl); }
+  bufferlist& get_middle() { return middle; }
+
   void set_data(const bufferlist &d) { data = d; }
   void copy_data(const bufferlist &d) { data = d; }
   bufferlist& get_data() { return data; }
@@ -201,6 +205,7 @@ public:
   }
   void calc_front_crc() {
     footer.front_crc = payload.crc32c(0);
+    footer.middle_crc = middle.crc32c(0);
   }
   void calc_data_crc() {
     footer.data_crc = data.crc32c(0);
@@ -242,7 +247,7 @@ public:
 };
 
 extern Message *decode_message(ceph_msg_header &header, ceph_msg_footer& footer,
-                              bufferlist& front, bufferlist& data);
+                              bufferlist& front, bufferlist& middle, bufferlist& data);
 inline ostream& operator<<(ostream& out, Message& m) {
   m.print(out);
   return out;
index 03327183531cae650dea30191beadf8aa3d7cdf6..542fa3948e4703ff374646dd6e483340148371ce 100644 (file)
@@ -311,8 +311,10 @@ void SimpleMessenger::Endpoint::dispatch_entry()
                    << " <== " << m->get_source_inst()
                    << " " << m->get_seq()
                    << " ==== " << *m
-                   << " ==== " << m->get_payload().length() << "+" << m->get_data().length()
-                   << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
+                   << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
+                   << "+" << m->get_data().length()
+                   << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
+                   << " " << m->get_footer().data_crc << ")"
                    << " " << m 
                    << dendl;
            dispatch(m);
@@ -1411,7 +1413,7 @@ void SimpleMessenger::Pipe::writer()
         dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
 
        // encode and copy out of *m
-        if (m->empty_payload()) 
+        if (m->empty_payload())
          m->encode_payload();
        m->calc_front_crc();
 
@@ -1501,16 +1503,27 @@ Message *SimpleMessenger::Pipe::read_message()
 
   // read front
   bufferlist front;
-  bufferptr bp;
   int front_len = header.front_len;
   if (front_len) {
-    bp = buffer::create(front_len);
+    bufferptr bp = buffer::create(front_len);
     if (tcp_read( sd, bp.c_str(), front_len ) < 0) 
       return 0;
     front.push_back(bp);
     dout(20) << "reader got front " << front.length() << dendl;
   }
 
+  // read middle
+  bufferlist middle;
+  int middle_len = header.middle_len;
+  if (middle_len) {
+    bufferptr bp = buffer::create(middle_len);
+    if (tcp_read( sd, bp.c_str(), middle_len ) < 0) 
+      return 0;
+    middle.push_back(bp);
+    dout(20) << "reader got middle " << middle.length() << dendl;
+  }
+
+
   // read data
   bufferlist data;
   unsigned data_len = le32_to_cpu(header.data_len);
@@ -1521,7 +1534,7 @@ Message *SimpleMessenger::Pipe::read_message()
       // head
       int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
                     (unsigned)left);
-      bp = buffer::create(head);
+      bufferptr bp = buffer::create(head);
       if (tcp_read( sd, bp.c_str(), head ) < 0) 
        return 0;
       data.push_back(bp);
@@ -1532,7 +1545,7 @@ Message *SimpleMessenger::Pipe::read_message()
     // middle
     int middle = left & PAGE_MASK;
     if (middle > 0) {
-      bp = buffer::create_page_aligned(middle);
+      bufferptr bp = buffer::create_page_aligned(middle);
       if (tcp_read( sd, bp.c_str(), middle ) < 0) 
        return 0;
       data.push_back(bp);
@@ -1541,7 +1554,7 @@ Message *SimpleMessenger::Pipe::read_message()
     }
 
     if (left) {
-      bp = buffer::create(left);
+      bufferptr bp = buffer::create(left);
       if (tcp_read( sd, bp.c_str(), left ) < 0) 
        return 0;
       data.push_back(bp);
@@ -1556,7 +1569,7 @@ Message *SimpleMessenger::Pipe::read_message()
   int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
   dout(10) << "aborted = " << aborted << dendl;
   if (aborted) {
-    dout(0) << "reader got " << front.length() << " + " << data.length()
+    dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
            << " byte message from " << header.src << ".. ABORTED" << dendl;
     // MEH FIXME 
     Message *m = new MGenericMessage(CEPH_MSG_PING);
@@ -1565,9 +1578,9 @@ Message *SimpleMessenger::Pipe::read_message()
     return m;
   }
 
-  dout(20) << "reader got " << front.length() << " + " << data.length()
+  dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
           << " byte message from " << header.src << dendl;
-  return decode_message(header, footer, front, data);
+  return decode_message(header, footer, front, middle, data);
 }
 
 
@@ -1649,11 +1662,13 @@ int SimpleMessenger::Pipe::write_message(Message *m)
 
   // get envelope, buffers
   header.front_len = m->get_payload().length();
+  header.middle_len = m->get_middle().length();
   header.data_len = m->get_data().length();
   footer.flags = 0;
   m->calc_header_crc();
 
   bufferlist blist = m->get_payload();
+  blist.append(m->get_middle());
   blist.append(m->get_data());
   
   dout(20)  << "write_message " << m << " to " << header.dst << dendl;