list_move_tail(&m->list_head, &con->out_sent);
con->out_msg = m; /* we don't bother taking a reference here. */
- dout("prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
+ dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type),
- le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len),
+ le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
+ le32_to_cpu(m->hdr.data_len),
m->nr_pages);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
- /* tag + hdr + front */
+ /* tag + hdr + front + middle */
con->out_kvec[v].iov_base = &tag_msg;
con->out_kvec[v++].iov_len = 1;
con->out_kvec[v].iov_base = &m->hdr;
con->out_kvec[v++].iov_len = sizeof(m->hdr);
con->out_kvec[v++] = m->front;
+ if (m->middle.iov_len)
+ con->out_kvec[v++] = m->middle;
con->out_kvec_left = v;
- con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
+ con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
+ m->middle.iov_len;
con->out_kvec_cur = con->out_kvec;
/* fill in crc (except data pages), footer */
con->out_msg->footer.flags = 0;
con->out_msg->footer.front_crc =
cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
- con->out_msg->footer.middle_crc = 0;
+ if (m->middle.iov_base)
+ con->out_msg->footer.middle_crc =
+ cpu_to_le32(crc32c(0, m->middle.iov_base,
+ m->middle.iov_len));
+ else
+ con->out_msg->footer.middle_crc = 0;
con->out_msg->footer.data_crc = 0;
+ dout("prepare_write_message front_crc %u data_crc %u\n",
+ le32_to_cpu(con->out_msg->footer.front_crc),
+ le32_to_cpu(con->out_msg->footer.middle_crc));
/* is there a data payload? */
if (le32_to_cpu(m->hdr.data_len) > 0) {
ceph_msg_put(msg);
} else {
msg->hdr.seq = cpu_to_le64(++con->out_seq);
- dout("----- %p %u to %s%d %d=%s len %d+%d -----\n", msg,
+ dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg,
(unsigned)con->out_seq,
ENTITY_NAME(msg->hdr.dst.name), le16_to_cpu(msg->hdr.type),
ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
le32_to_cpu(msg->hdr.front_len),
+ le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
dout("ceph_msg_send %p seq %llu for %s%d on %p pgs %d\n",
msg, le64_to_cpu(msg->hdr.seq),
Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
- bufferlist& front, bufferlist& data)
+ bufferlist& front, bufferlist& middle, bufferlist& data)
{
// verify crc
if (!g_conf.ms_nocrc) {
__u32 front_crc = front.crc32c(0);
+ __u32 middle_crc = middle.crc32c(0);
__u32 data_crc = data.crc32c(0);
if (front_crc != footer.front_crc) {
- dout(0) << "bad crc in front " << front_crc << " != " << footer.front_crc << dendl;
+ dout(0) << "bad crc in front " << front_crc << " != exp " << footer.front_crc << dendl;
+ dout(20);
+ front.hexdump(*_dout);
+ *_dout << dendl;
+ return 0;
+ }
+ if (middle_crc != footer.middle_crc) {
+ dout(0) << "bad crc in middle " << middle_crc << " != exp " << footer.middle_crc << dendl;
+ dout(20);
+ middle.hexdump(*_dout);
+ *_dout << dendl;
return 0;
}
if (data_crc != footer.data_crc &&
!(footer.flags & CEPH_MSG_FOOTER_NOCRC)) {
- dout(0) << "bad crc in data " << data_crc << " != " << footer.data_crc << dendl;
+ dout(0) << "bad crc in data " << data_crc << " != exp " << footer.data_crc << dendl;
+ dout(20);
+ data.hexdump(*_dout);
+ *_dout << dendl;
return 0;
}
}
m->set_header(header);
m->set_footer(footer);
m->set_payload(front);
+ m->set_middle(middle);
m->set_data(data);
try {
catch (buffer::error *e) {
dout(0) << "failed to decode message of type " << type << ": " << *e << dendl;
delete e;
+ assert(0);
return 0;
}
ceph_msg_header header; // headerelope
ceph_msg_footer footer;
bufferlist payload; // "front" unaligned blob
+ bufferlist middle; // "middle" unaligned blob
bufferlist data; // data payload (page-alignment will be preserved where possible)
utime_t recv_stamp;
void set_footer(const ceph_msg_footer &e) { footer = e; }
ceph_msg_footer &get_footer() { return footer; }
- void clear_payload() { payload.clear(); }
+ void clear_payload() { payload.clear(); middle.clear(); }
bool empty_payload() { return payload.length() == 0; }
bufferlist& get_payload() { return payload; }
void set_payload(bufferlist& bl) { payload.claim(bl); }
void copy_payload(const bufferlist& bl) { payload = bl; }
+ void set_middle(bufferlist& bl) { middle.claim(bl); }
+ bufferlist& get_middle() { return middle; }
+
void set_data(const bufferlist &d) { data = d; }
void copy_data(const bufferlist &d) { data = d; }
bufferlist& get_data() { return data; }
}
void calc_front_crc() {
footer.front_crc = payload.crc32c(0);
+ footer.middle_crc = middle.crc32c(0);
}
void calc_data_crc() {
footer.data_crc = data.crc32c(0);
};
extern Message *decode_message(ceph_msg_header &header, ceph_msg_footer& footer,
- bufferlist& front, bufferlist& data);
+ bufferlist& front, bufferlist& middle, bufferlist& data);
inline ostream& operator<<(ostream& out, Message& m) {
m.print(out);
return out;
<< " <== " << m->get_source_inst()
<< " " << m->get_seq()
<< " ==== " << *m
- << " ==== " << m->get_payload().length() << "+" << m->get_data().length()
- << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
+ << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
+ << "+" << m->get_data().length()
+ << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
+ << " " << m->get_footer().data_crc << ")"
<< " " << m
<< dendl;
dispatch(m);
dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
// encode and copy out of *m
- if (m->empty_payload())
+ if (m->empty_payload())
m->encode_payload();
m->calc_front_crc();
// read front
bufferlist front;
- bufferptr bp;
int front_len = header.front_len;
if (front_len) {
- bp = buffer::create(front_len);
+ bufferptr bp = buffer::create(front_len);
if (tcp_read( sd, bp.c_str(), front_len ) < 0)
return 0;
front.push_back(bp);
dout(20) << "reader got front " << front.length() << dendl;
}
+ // read middle
+ bufferlist middle;
+ int middle_len = header.middle_len;
+ if (middle_len) {
+ bufferptr bp = buffer::create(middle_len);
+ if (tcp_read( sd, bp.c_str(), middle_len ) < 0)
+ return 0;
+ middle.push_back(bp);
+ dout(20) << "reader got middle " << middle.length() << dendl;
+ }
+
+
// read data
bufferlist data;
unsigned data_len = le32_to_cpu(header.data_len);
// head
int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
(unsigned)left);
- bp = buffer::create(head);
+ bufferptr bp = buffer::create(head);
if (tcp_read( sd, bp.c_str(), head ) < 0)
return 0;
data.push_back(bp);
// middle
int middle = left & PAGE_MASK;
if (middle > 0) {
- bp = buffer::create_page_aligned(middle);
+ bufferptr bp = buffer::create_page_aligned(middle);
if (tcp_read( sd, bp.c_str(), middle ) < 0)
return 0;
data.push_back(bp);
}
if (left) {
- bp = buffer::create(left);
+ bufferptr bp = buffer::create(left);
if (tcp_read( sd, bp.c_str(), left ) < 0)
return 0;
data.push_back(bp);
int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
dout(10) << "aborted = " << aborted << dendl;
if (aborted) {
- dout(0) << "reader got " << front.length() << " + " << data.length()
+ dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message from " << header.src << ".. ABORTED" << dendl;
// MEH FIXME
Message *m = new MGenericMessage(CEPH_MSG_PING);
return m;
}
- dout(20) << "reader got " << front.length() << " + " << data.length()
+ dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message from " << header.src << dendl;
- return decode_message(header, footer, front, data);
+ return decode_message(header, footer, front, middle, data);
}
// get envelope, buffers
header.front_len = m->get_payload().length();
+ header.middle_len = m->get_middle().length();
header.data_len = m->get_data().length();
footer.flags = 0;
m->calc_header_crc();
bufferlist blist = m->get_payload();
+ blist.append(m->get_middle());
blist.append(m->get_data());
dout(20) << "write_message " << m << " to " << header.dst << dendl;